Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

resolve inconsistencies detected during ring merge #3637

Merged
merged 7 commits into from
Jun 14, 2019
74 changes: 60 additions & 14 deletions ipam/ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,12 +238,22 @@ func (r *Ring) Merge(gossip Ring) (bool, error) {
return false, ErrDifferentRange
}

result, updated, err := r.Entries.merge(gossip.Entries, r.Peer)
result, updated, err := r.Entries.merge(gossip.Entries, r.Peer, r)

if err != nil {
return false, err
}

// reset the free space for entries if there is invalid free space
// due to accepting an unexpected update from the peers
for i := 0; i < len(result); i++ {
distance := r.distance(result.entry(i).Token, result.entry(i+1).Token)
if result.entry(i).Peer == r.Peer && result.entry(i).Free > distance {
// case that can arise when a range that we own that got split and had no allocations
result.entry(i).Free = distance
}
}

if err := r.checkEntries(result); err != nil {
return false, fmt.Errorf("Merge of incoming data causes: %s", err)
}
Expand All @@ -260,7 +270,7 @@ func (r *Ring) Merge(gossip Ring) (bool, error) {
// entries belonging to ourPeer. Returns the merged entries and an
// indication whether the merge resulted in any changes, i.e. the
// result differs from the original.
func (es entries) merge(other entries, ourPeer mesh.PeerName) (result entries, updated bool, err error) {
func (es entries) merge(other entries, ourPeer mesh.PeerName, r *Ring) (result entries, updated bool, err error) {
var mine, theirs *entry
var previousOwner *mesh.PeerName
addToResult := func(e entry) { result = append(result, &e) }
Expand All @@ -270,6 +280,14 @@ func (es entries) merge(other entries, ourPeer mesh.PeerName) (result entries, u
previousOwner = nil
}

checkEntryHasAllocations := func(current, next *entry) bool {
size := r.distance(current.Token, next.Token)
if current.Free < size {
return true
}
return false
}

// i is index into es; j is index into other
var i, j int
for i < len(es) && j < len(other) {
Expand All @@ -282,29 +300,57 @@ func (es entries) merge(other entries, ourPeer mesh.PeerName) (result entries, u
case mine.Token > theirs.Token:
// insert, checking that a range owned by us hasn't been split
if previousOwner != nil && *previousOwner == ourPeer && theirs.Peer != ourPeer {
err = errEntryInMyRange(theirs)
return
// check we have no allocations in the range that got split
if checkEntryHasAllocations(es.entry(i-1), mine) {
murali-reddy marked this conversation as resolved.
Show resolved Hide resolved
err = errEntryInMyRange(theirs)
return
}
}
addTheirs(*theirs)
j++
case mine.Token == theirs.Token:
common.Log.Debugln(fmt.Sprintf("[ring %s]: Merge token=%s mine.Peer=%s theirs.Peer=%s mine.Version=%s theirs.Version=%s", ourPeer, mine.Token, mine.Peer, theirs.Peer, fmt.Sprint(mine.Version), fmt.Sprint(theirs.Version)))
// merge
switch {
case mine.Version >= theirs.Version:
if mine.Version == theirs.Version && !mine.Equal(theirs) {
err = errInconsistentEntry(mine, theirs)
return
if mine.Peer == ourPeer {
// if we own the entry and has allocations
if checkEntryHasAllocations(mine, es.entry(i+1)) {
err = errInconsistentEntry(mine, theirs)
return
}
}
// tie-break here, pick the entry with the highest free count
if mine.Free >= theirs.Free {
addToResult(*mine)
previousOwner = &mine.Peer
} else {
addTheirs(*theirs)
}
} else {
addToResult(*mine)
previousOwner = &mine.Peer
}
addToResult(*mine)
previousOwner = &mine.Peer
common.Log.Debugln(fmt.Sprintf("[ring %s]: Merge token=%s mine.Peer=%s theirs.Peer=%s mine.Version=%s theirs.Version=%s", ourPeer, mine.Token, mine.Peer, theirs.Peer, fmt.Sprint(mine.Version), fmt.Sprint(theirs.Version)))
case mine.Version < theirs.Version:
if mine.Peer == ourPeer { // We shouldn't receive updates to our own tokens
err = errNewerVersion(mine, theirs)
return
if mine.Peer == ourPeer {
// We received update to our own tokens accept the received entry
// if either it belongs to a different peer and we do not have allocations
// in the range effectively given away, or it belongs to our own peer, in
// which case we should set our version to the one received plus one,
// effectively imposing our existing entry.
if theirs.Peer != ourPeer && !checkEntryHasAllocations(mine, es.entry(i+1)) {
addTheirs(*theirs)
murali-reddy marked this conversation as resolved.
Show resolved Hide resolved
} else if theirs.Peer == ourPeer {
mine.Version = theirs.Version + 1
addToResult(*mine)
} else {
err = errNewerVersion(mine, theirs)
return
}
} else {
addTheirs(*theirs)
}
addTheirs(*theirs)
common.Log.Debugln(fmt.Sprintf("[ring %s]: Merge token=%s mine.Peer=%s theirs.Peer=%s mine.Version=%s theirs.Version=%s", ourPeer, mine.Token, mine.Peer, theirs.Peer, fmt.Sprint(mine.Version), fmt.Sprint(theirs.Version)))
}
i++
j++
Expand Down
62 changes: 55 additions & 7 deletions ipam/ring/ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,61 @@ func TestMergeSimple(t *testing.T) {
require.Equal(t, ring2.Entries, ring1.Entries)
}

func TestMergeWithConflicts(t *testing.T) {

// received update to entry with a token and version identical to an entry we have
// but but holding different content, if there are no allocations, resolve the
// conflict by picking entry one with high free count
ring1 := NewRing(start, end, peer1name)
ring2 := NewRing(start, end, peer2name)
ring1.Entries = []*entry{{Token: start, Peer: peer1name, Free: 256, Version: 1}}
ring2.Entries = []*entry{{Token: start, Peer: peer2name, Free: 256, Version: 1}}
require.NoError(t, merge(ring1, ring2))

// received update to entry with a token and version identical to an entry we have
// but but holding different content, if there are allocations, then reject the received
// entry
ring1 = NewRing(start, end, peer1name)
ring2 = NewRing(start, end, peer2name)
ring1.Entries = []*entry{{Token: start, Peer: peer1name, Free: 128, Version: 1}}
ring2.Entries = []*entry{{Token: start, Peer: peer2name, Free: 256, Version: 1}}
require.Error(t, merge(ring1, ring2), "Expected error")

// received an entry with update to one of our own tokens and with new version,
// accept the received entry if its still going to belong to us
ring1 = NewRing(start, end, peer1name)
ring2 = NewRing(start, end, peer2name)
ring1.Entries = []*entry{{Token: start, Peer: peer1name}}
ring2.Entries = []*entry{{Token: start, Peer: peer1name, Version: 1}}
require.NoError(t, merge(ring1, ring2))

// received an entry with update to one of our own tokens and with new version,
// but belongs to a different peer, accept the received entry if we do not
// have allocations in the range
ring1 = NewRing(start, end, peer1name)
ring2 = NewRing(start, end, peer2name)
ring1.Entries = []*entry{{Token: start, Peer: peer1name, Free: 256}}
ring2.Entries = []*entry{{Token: start, Peer: peer2name, Free: 256, Version: 1}}
require.NoError(t, merge(ring1, ring2))

// received an entry with update to one of our own tokens and with new version,
// reject received entry if we have allocations in the range
ring1 = NewRing(start, end, peer1name)
ring2 = NewRing(start, end, peer2name)
ring1.Entries = []*entry{{Token: start, Peer: peer1name, Free: 128}}
ring2.Entries = []*entry{{Token: start, Peer: peer2name, Version: 1}}
require.Error(t, merge(ring1, ring2), "Expected error")

// we receive an entry that splits one of our ranges, giving some of it
// away to another peer. accept the entry provided that we have no allocations
// in the range that got given away.
ring1 = NewRing(start, end, peer1name)
ring2 = NewRing(start, end, peer2name)
ring1.Entries = []*entry{{Token: start, Peer: peer1name, Free: 256, Version: 1}}
ring2.Entries = []*entry{{Token: start, Peer: peer1name, Free: 128, Version: 2}, {Token: middle, Peer: peer2name, Free: 128}}
require.NoError(t, merge(ring1, ring2))
}

func TestMergeErrors(t *testing.T) {
// Cannot Merge in an invalid ring
ring1 := NewRing(start, end, peer1name)
Expand All @@ -231,13 +286,6 @@ func TestMergeErrors(t *testing.T) {
ring2.Entries = []*entry{}
require.True(t, merge(ring1, ring2) == ErrDifferentRange, "Expected ErrDifferentRange")

// Cannot Merge newer version of entry I own
ring2 = NewRing(start, end, peer2name)
ring1.Entries = []*entry{{Token: start, Peer: peer1name}}
ring2.Entries = []*entry{{Token: start, Peer: peer1name, Version: 1}}
fmt.Println(merge(ring1, ring2))
require.Error(t, merge(ring1, ring2), "Expected error")

// Cannot Merge two entries with same version but different hosts
ring1.Entries = []*entry{{Token: start, Peer: peer1name}}
ring2.Entries = []*entry{{Token: start, Peer: peer2name}}
Expand Down