Skip to content

Commit

Permalink
sticky: fix extreme edge case for complex balancing
Browse files Browse the repository at this point in the history
I've been trying to trigger this edge case since I first thought of the
complex balancing: this was the original purpose of the "odd pyramid"
test case.

It is possible for a series of steals to result in one member being
stolen from, only for it to need to steal partitions back from the
original stealing member.

This can be triggered in the new test added for this commit. Since
iteration order is hard / impossible to guarantee, to explicitly trigger
this behavior, you would need to add a sort when balancing such that the
min member chosen to balance to is always chosen in alphabetic order.
If that is done, then B steals from A, only for A to later steal from B.

The problem this triggers is that when A steals back from B, it can
steal a different partition than the one it lost. If the original one it
lost was one it was consuming prior, we lose some stickiness.

A fix for this is to, when stealing, prefer partitions that a member
originally owned.

To do this, we keep track of the original owner, but that alone is not
enough. We must also **prefer** search paths that steal back to original
owners. The original-owner search path could actually have a longer
distance than otherwise, so we now have to fix the heap when we update a
search path.

There are some more comments within the commit that hopefully explains
this a bit more fully. This commit closes out the last aspect of the
graph search that has been on my mind for some time. I'm hoping that
this addresses the final aspect of path finding & stealing such that the
algorithm is now perfect, but it's difficult to say and realistically I
would need to prove the algorithm to be sure (which would be rather
difficult for me).

Realistically, no join pattern would actually trigger this behavior, but
I'm happy that it's addressed.

There is a bit of slowdown due to having one more comparison when heap
searching, as well as from the slightly larger allocation.

name                           old time/op    new time/op    delta
Large-8                          2.14ms ± 0%    2.13ms ± 1%    ~     (p=0.156 n=9+10)
LargeWithExisting-8              8.65ms ± 1%    8.67ms ± 1%    ~     (p=0.353 n=10+10)
LargeImbalanced-8                2.89ms ± 3%    3.00ms ± 6%  +3.86%  (p=0.001 n=9+9)
LargeWithExistingImbalanced-8    8.75ms ± 0%    8.79ms ± 1%    ~     (p=0.065 n=9+10)
Java/large-8                      160ms ± 1%     160ms ± 1%    ~     (p=0.393 n=10+10)
Java/large_imbalance-8            193ms ± 3%     197ms ± 4%  +2.29%  (p=0.043 n=10+10)
Java/medium-8                    8.82ms ± 1%    8.98ms ± 2%  +1.85%  (p=0.000 n=10+10)
Java/medium_imbalance-8          10.4ms ± 2%    10.8ms ± 1%  +4.17%  (p=0.000 n=10+10)
Java/small-8                     6.80ms ± 2%    7.00ms ± 2%  +2.98%  (p=0.000 n=10+10)
Java/small_imbalance-8           8.32ms ± 2%    8.72ms ± 3%  +4.85%  (p=0.000 n=10+10)

name                           old alloc/op   new alloc/op   delta
Large-8                          1.92MB ± 0%    1.96MB ± 0%  +2.57%  (p=0.000 n=9+10)
LargeWithExisting-8              3.69MB ± 0%    3.74MB ± 0%  +1.39%  (p=0.000 n=10+10)
LargeImbalanced-8                2.12MB ± 1%    2.17MB ± 1%  +2.51%  (p=0.000 n=10+10)
LargeWithExistingImbalanced-8    3.72MB ± 0%    3.77MB ± 0%  +1.30%  (p=0.000 n=10+10)
Java/large-8                      127MB ± 0%     129MB ± 0%  +1.58%  (p=0.000 n=9+8)
Java/large_imbalance-8            131MB ± 0%     133MB ± 0%  +1.54%  (p=0.000 n=9+9)
Java/medium-8                    7.78MB ± 0%    7.88MB ± 0%  +1.26%  (p=0.000 n=10+10)
Java/medium_imbalance-8          8.05MB ± 0%    8.16MB ± 0%  +1.32%  (p=0.000 n=10+10)
Java/small-8                     6.20MB ± 0%    6.28MB ± 0%  +1.32%  (p=0.000 n=10+10)
Java/small_imbalance-8           6.42MB ± 0%    6.51MB ± 0%  +1.38%  (p=0.000 n=10+7)

name                           old allocs/op  new allocs/op  delta
Large-8                             335 ± 0%       335 ± 0%    ~     (p=0.429 n=6+10)
LargeWithExisting-8               18.6k ± 1%     18.6k ± 0%    ~     (p=0.739 n=10+10)
LargeImbalanced-8                   786 ± 5%       786 ± 4%    ~     (p=0.870 n=10+10)
LargeWithExistingImbalanced-8     18.4k ± 0%     18.4k ± 0%    ~     (p=0.518 n=10+7)
Java/large-8                      6.04k ± 0%     6.04k ± 0%    ~     (all equal)
Java/large_imbalance-8            7.43k ± 0%     7.42k ± 0%    ~     (p=0.072 n=9+8)
Java/medium-8                     3.03k ± 0%     3.03k ± 0%    ~     (all equal)
Java/medium_imbalance-8           3.15k ± 0%     3.15k ± 0%    ~     (p=0.322 n=10+10)
Java/small-8                      2.46k ± 0%     2.46k ± 0%    ~     (p=0.591 n=10+9)
Java/small_imbalance-8            2.55k ± 0%     2.55k ± 0%    ~     (p=0.232 n=10+9)
  • Loading branch information
twmb committed May 4, 2021
1 parent 9ada82d commit d1341ae
Show file tree
Hide file tree
Showing 3 changed files with 224 additions and 75 deletions.
96 changes: 69 additions & 27 deletions pkg/kgo/internal/sticky/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ type graph struct {
out [][]uint32

// edge => who owns this edge; built in balancer's assignUnassigned
cxns []uint16
cxns []partitionConsumer

// scores are all node scores from a seach node. The distance field
// is reset on findSteal to noScore.
// is reset on findSteal to infinityScore..
scores pathScores

// heapBuf and pathBuf are backing buffers that are reused every
Expand All @@ -28,7 +28,7 @@ type graph struct {
}

func (b *balancer) newGraph(
partitionConsumers []uint16,
partitionConsumers []partitionConsumer,
topicPotentials [][]uint16,
) graph {
g := graph{
Expand Down Expand Up @@ -56,15 +56,15 @@ func (b *balancer) newGraph(
}

func (g *graph) changeOwnership(edge int32, newDst uint16) {
g.cxns[edge] = newDst
g.cxns[edge].memberNum = newDst
}

// findSteal uses Dijkstra search to find a path from the best node it can reach.
func (g *graph) findSteal(from uint16) ([]stealSegment, bool) {
// First, we must reset our scores from any prior run. This is O(M),
// but is fast and faster than making a map and extending it a lot.
for i := range g.scores {
g.scores[i].distance = noScore
g.scores[i].distance = infinityScore
g.scores[i].done = false
}

Expand Down Expand Up @@ -97,28 +97,40 @@ func (g *graph) findSteal(from uint16) ([]stealSegment, bool) {
info := g.b.topicInfos[topicNum]
firstPartNum, lastPartNum := info.partNum, info.partNum+info.partitions
for edge := firstPartNum; edge < lastPartNum; edge++ {
neighborNode := g.cxns[edge]
neighborNode := g.cxns[edge].memberNum
neighbor, isNew := g.getScore(neighborNode)
if neighbor.done {
continue
}

distance := current.distance + 1
// If our neghbor distance is less or equal, then we can
// reach the neighbor through a previous route we have
// tried and should not try again.
if distance < neighbor.distance {

// The neighbor is the current node that owns this edge.
// If our node originally owned this partition, then it
// would be preferable to steal edge back.
srcIsOriginal := g.cxns[edge].originalNum == current.node

// If this is a new neighbor (our first time seeing the neighbor
// in our search), this is also the shortest path to reach them,
// where shortest defers preference to original sources THEN distance.
if isNew {
neighbor.parent = current
neighbor.srcIsOriginal = srcIsOriginal
neighbor.srcEdge = edge
neighbor.distance = distance
if isNew {
heap.Push(rem, neighbor)
}

// We never need to fix the heap position.
// Our level is static, and once we set
// distance, it is the minimum it will be
// and we never revisit the neighbor.
neighbor.heapIdx = len(*rem)
heap.Push(rem, neighbor)

} else if !neighbor.srcIsOriginal && srcIsOriginal {
// If the search path has seen this neighbor before, but
// we now are evaluating a partition that would increase
// stickiness if stolen, then fixup the neighbor's parent
// and srcEdge.
neighbor.parent = current
neighbor.srcIsOriginal = true
neighbor.srcEdge = edge
neighbor.distance = distance
heap.Fix(rem, neighbor.heapIdx)
}
}
}
Expand All @@ -133,23 +145,39 @@ type stealSegment struct {
part int32 // partNum
}

// As we traverse a graph, we assign each node a path score, which tracks a few
// numbers for what it would take to reach this node from our first node.
type pathScore struct {
done bool
node uint16 // member num
// Done is set to true when we pop a node off of the graph. Once we
// pop a node, it means we have found a best path to that node and
// we do not want to revisit it for processing if any other future
// nodes reach back to this one.
done bool

// srcIsOriginal is true if, were our parent to steal srcEdge, would
// that put srcEdge back on the original member. That is, if we are B
// and our parent is A, does our srcEdge originally belong do A?
//
// This field exists to work around a very slim edge case where a
// partition is stolen by B and then needs to be stolen back by A
// later.
srcIsOriginal bool

node uint16 // our member num
distance int32 // how many steals it would take to get here
srcEdge int32 // partNum
srcEdge int32 // the partition used to reach us
level int32 // partitions owned on this segment
parent *pathScore
heapIdx int
}

type pathScores []pathScore

const infinityScore = 1<<31 - 1
const noScore = -1

func (g *graph) getScore(node uint16) (*pathScore, bool) {
r := &g.scores[node]
exists := r.distance != noScore
exists := r.distance != infinityScore
if !exists {
*r = pathScore{
node: node,
Expand All @@ -165,14 +193,28 @@ type pathHeap []*pathScore
func (p *pathHeap) Len() int { return len(*p) }
func (p *pathHeap) Swap(i, j int) {
h := *p
h[i], h[j] = h[j], h[i]
l, r := h[i], h[j]
l.heapIdx, r.heapIdx = r.heapIdx, l.heapIdx
h[i], h[j] = r, l
}

// For our path, we always want to prioritize stealing a partition we
// originally owned. This may result in a longer steal path, but it will
// increase stickiness.
//
// Next, our real goal, which is to find a node we can steal from. Because of
// this, we always want to sort by the highest level. The pathHeap stores
// reachable paths, so by sorting by the highest level, we terminate quicker:
// we always check the most likely candidates to quit our search.
//
// Finally, we simply prefer searching through shorter paths and, barring that,
// just sort by node.
func (p *pathHeap) Less(i, j int) bool {
l, r := (*p)[i], (*p)[j]
return l.level > r.level || l.level == r.level &&
(l.distance < r.distance || l.distance == r.distance &&
l.node < r.node)
return l.srcIsOriginal && !r.srcIsOriginal || !l.srcIsOriginal && !r.srcIsOriginal &&
(l.level > r.level || l.level == r.level &&
(l.distance < r.distance || l.distance == r.distance &&
l.node < r.node))
}

func (p *pathHeap) Push(x interface{}) { *p = append(*p, x.(*pathScore)) }
Expand Down
81 changes: 33 additions & 48 deletions pkg/kgo/internal/sticky/sticky.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ func (b *balancer) into() Plan {
topics := make(map[string][]int32, ntopics)
plan[member] = topics

// partInfos is created by topic, and partNums refers to
// indices in partInfos. If we sort by partNum, we have sorted
// partOwners is created by topic, and partNums refers to
// indices in partOwners. If we sort by partNum, we have sorted
// topics and partitions.
sort.Sort(&partNums)

Expand All @@ -158,7 +158,6 @@ func (b *balancer) into() Plan {
topicParts = append(topicParts, int32(partition))
}
topics[lastTopicInfo.topic] = topicParts[:len(topicParts):len(topicParts)]
topicParts = topicParts[len(topicParts):]
}
return plan
}
Expand Down Expand Up @@ -346,13 +345,6 @@ type memberGeneration struct {
generation int32
}

// for alloc avoidance since it is easy enough.
type memberGenerations []memberGeneration

func (m *memberGenerations) Len() int { return len(*m) }
func (m *memberGenerations) Less(i, j int) bool { s := *m; return s[i].generation > s[j].generation }
func (m *memberGenerations) Swap(i, j int) { s := *m; s[i], s[j] = s[j], s[i] }

type topicPartition struct {
topic string
partition int32
Expand Down Expand Up @@ -398,7 +390,7 @@ func (b *balancer) assignUnassignedAndInitGraph() {
// below in the partition mapping. Doing this two step process allows
// for a 10x speed boost rather than ranging over all partitions many
// times.
membersBufs := make([]uint16, len(b.topicNums)*len(b.members))
topicPotentialsBuf := make([]uint16, len(b.topicNums)*len(b.members))
topicPotentials := make([][]uint16, len(b.topicNums))
for memberNum, member := range b.members {
for _, topic := range member.Topics {
Expand All @@ -408,8 +400,8 @@ func (b *balancer) assignUnassignedAndInitGraph() {
}
memberNums := topicPotentials[topicNum]
if cap(memberNums) == 0 {
memberNums = membersBufs[:0:len(b.members)]
membersBufs = membersBufs[len(b.members):]
memberNums = topicPotentialsBuf[:0:len(b.members)]
topicPotentialsBuf = topicPotentialsBuf[len(b.members):]
}
topicPotentials[topicNum] = append(memberNums, uint16(memberNum))
}
Expand All @@ -432,16 +424,15 @@ func (b *balancer) assignUnassignedAndInitGraph() {
// Next, over the prior plan, un-map deleted topics or topics that
// members no longer want. This is where we determine what is now
// unassigned.
partitionConsumers := make([]uint16, cap(b.partOwners)) // partNum => consuming member
partitionConsumers := make([]partitionConsumer, cap(b.partOwners)) // partNum => consuming member
for i := range partitionConsumers {
partitionConsumers[i] = unassignedPart
partitionConsumers[i] = partitionConsumer{unassignedPart, unassignedPart}
}
for memberNum := range b.plan {
partNums := &b.plan[memberNum]
for _, partNum := range *partNums {
topicNum := b.partOwners[partNum]
if len(topicPotentials[topicNum]) == 0 { // all prior subscriptions stopped wanting this partition
partitionConsumers[partNum] = deletedPart
partNums.remove(partNum)
continue
}
Expand All @@ -457,7 +448,7 @@ func (b *balancer) assignUnassignedAndInitGraph() {
partNums.remove(partNum)
continue
}
partitionConsumers[partNum] = uint16(memberNum)
partitionConsumers[partNum] = partitionConsumer{uint16(memberNum), uint16(memberNum)}
}
}

Expand All @@ -467,7 +458,7 @@ func (b *balancer) assignUnassignedAndInitGraph() {
}

for partNum, owner := range partitionConsumers {
if owner != unassignedPart {
if owner.memberNum != unassignedPart {
continue
}
potentials := topicPotentials[b.partOwners[partNum]]
Expand All @@ -477,7 +468,7 @@ func (b *balancer) assignUnassignedAndInitGraph() {
assigned := potentials[0]
b.plan[assigned].add(int32(partNum))
(&membersByPartitions{potentials, b.plan}).fix0()
partitionConsumers[partNum] = assigned
partitionConsumers[partNum].memberNum = assigned
}

// Lastly, with everything assigned, we build our steal graph for
Expand All @@ -490,20 +481,9 @@ func (b *balancer) assignUnassignedAndInitGraph() {
}
}

const (
// deletedPart and unassignedPart are fake member numbers that we use
// to track if a partition is deleted or unassigned.
//
// deletedPart is technically unneeded; if no member wants a partition,
// no member will be seen as a potential for taking it, so tracking
// that it was deleted is unnecessary. We do though just to be
// explicit.
//
// unassignedPart is the default of partitions until we process what
// members say they were assigned prior.
deletedPart = math.MaxUint16
unassignedPart = math.MaxUint16 - 1
)
// unassignedPart is a fake member number that we use to track if a partition
// is deleted or unassigned.
const unassignedPart = math.MaxUint16 - 1

// tryRestickyStales is a pre-assigning step where, for all stale members,
// we give partitions back to them if the partition is currently on an
Expand All @@ -512,7 +492,7 @@ const (
// This effectively re-stickies members before we balance further.
func (b *balancer) tryRestickyStales(
topicPotentials [][]uint16,
partitionConsumers []uint16,
partitionConsumers []partitionConsumer,
) {
for staleNum, lastOwnerNum := range b.stales {
potentials := topicPotentials[b.partOwners[staleNum]] // there must be a potential consumer if we are here
Expand All @@ -533,7 +513,7 @@ func (b *balancer) tryRestickyStales(
// must be on a different owner (cannot be lastOwner),
// otherwise it would not be a lastOwner in the stales
// map; it would just be the current owner.
currentOwner := partitionConsumers[staleNum]
currentOwner := partitionConsumers[staleNum].memberNum
lastOwnerPartitions := &b.plan[lastOwnerNum]
currentOwnerPartitions := &b.plan[currentOwner]
if lastOwnerPartitions.len()+1 < currentOwnerPartitions.len() {
Expand All @@ -543,6 +523,11 @@ func (b *balancer) tryRestickyStales(
}
}

type partitionConsumer struct {
memberNum uint16
originalNum uint16
}

// While assigning, we keep members per topic heap sorted by the number of
// partitions they are currently consuming. This allows us to have quick
// assignment vs. always scanning to see the min loaded member.
Expand Down Expand Up @@ -605,14 +590,14 @@ func (b *balancer) balance() {
return
}

minRem := min.members
maxRem := max.members
for len(minRem) > 0 && len(maxRem) > 0 {
dst := minRem[0]
src := maxRem[0]
minMems := min.members
maxMems := max.members
for len(minMems) > 0 && len(maxMems) > 0 {
dst := minMems[0]
src := maxMems[0]

minRem = minRem[1:]
maxRem = maxRem[1:]
minMems = minMems[1:]
maxMems = maxMems[1:]

srcPartitions := &b.plan[src]
dstPartitions := &b.plan[dst]
Expand All @@ -623,14 +608,14 @@ func (b *balancer) balance() {
nextUp := b.findLevel(min.level + 1)
nextDown := b.findLevel(max.level - 1)

upEnd := len(min.members) - len(minRem)
downEnd := len(max.members) - len(maxRem)
endOfUps := len(min.members) - len(minMems)
endOfDowns := len(max.members) - len(maxMems)

nextUp.members = append(nextUp.members, min.members[:upEnd]...)
nextDown.members = append(nextDown.members, max.members[:downEnd]...)
nextUp.members = append(nextUp.members, min.members[:endOfUps]...)
nextDown.members = append(nextDown.members, max.members[:endOfDowns]...)

min.members = min.members[upEnd:]
max.members = max.members[downEnd:]
min.members = min.members[endOfUps:]
max.members = max.members[endOfDowns:]

if len(min.members) == 0 {
b.planByNumPartitions.Delete(b.planByNumPartitions.Min())
Expand Down
Loading

0 comments on commit d1341ae

Please sign in to comment.