Skip to content

Commit d1341ae

Browse files
committed
sticky: fix extreme edge case for complex balancing
I've been trying to trigger this edge case since I first thought of the complex balancing: this was the original purpose of the "odd pyramid" test case. It is possible for a series of steals to result in one member being stolen from, only for it to need to steal partitions back from the original stealing member. This can be triggered in the new test added for this commit. Since iteration order is hard / impossible to guarantee, to explicitly trigger this behavior, you would need to add a sort when balancing such that the min member chosen to balance to is always chosen in alphabetic order. If that is done, then B steals from A, only for A to later steal from B. The problem this triggers is that when A steals back from B, it can steal a different partition than the one it lost. If the original one it lost was one it was consuming prior, we lose some stickiness. A fix for this is to, when stealing, prefer partitions that a member originally owned. To do this, we keep track of the original owner, but that alone is not enough. We must also **prefer** search paths that steal back to original owners. The original-owner search path could actually have a longer distance than otherwise, so we now have to fix the heap when we update a search path. There are some more comments within the commit that hopefully explains this a bit more fully. This commit closes out the last aspect of the graph search that has been on my mind for some time. I'm hoping that this addresses the final aspect of path finding & stealing such that the algorithm is now perfect, but it's difficult to say and realistically I would need to prove the algorithm to be sure (which would be rather difficult for me). Realistically, no join pattern would actually trigger this behavior, but I'm happy that it's addressed. There is a bit of slowdown due to having one more comparison when heap searching, as well as from the slightly larger allocation. name old time/op new time/op delta Large-8 2.14ms ± 0% 2.13ms ± 1% ~ (p=0.156 n=9+10) LargeWithExisting-8 8.65ms ± 1% 8.67ms ± 1% ~ (p=0.353 n=10+10) LargeImbalanced-8 2.89ms ± 3% 3.00ms ± 6% +3.86% (p=0.001 n=9+9) LargeWithExistingImbalanced-8 8.75ms ± 0% 8.79ms ± 1% ~ (p=0.065 n=9+10) Java/large-8 160ms ± 1% 160ms ± 1% ~ (p=0.393 n=10+10) Java/large_imbalance-8 193ms ± 3% 197ms ± 4% +2.29% (p=0.043 n=10+10) Java/medium-8 8.82ms ± 1% 8.98ms ± 2% +1.85% (p=0.000 n=10+10) Java/medium_imbalance-8 10.4ms ± 2% 10.8ms ± 1% +4.17% (p=0.000 n=10+10) Java/small-8 6.80ms ± 2% 7.00ms ± 2% +2.98% (p=0.000 n=10+10) Java/small_imbalance-8 8.32ms ± 2% 8.72ms ± 3% +4.85% (p=0.000 n=10+10) name old alloc/op new alloc/op delta Large-8 1.92MB ± 0% 1.96MB ± 0% +2.57% (p=0.000 n=9+10) LargeWithExisting-8 3.69MB ± 0% 3.74MB ± 0% +1.39% (p=0.000 n=10+10) LargeImbalanced-8 2.12MB ± 1% 2.17MB ± 1% +2.51% (p=0.000 n=10+10) LargeWithExistingImbalanced-8 3.72MB ± 0% 3.77MB ± 0% +1.30% (p=0.000 n=10+10) Java/large-8 127MB ± 0% 129MB ± 0% +1.58% (p=0.000 n=9+8) Java/large_imbalance-8 131MB ± 0% 133MB ± 0% +1.54% (p=0.000 n=9+9) Java/medium-8 7.78MB ± 0% 7.88MB ± 0% +1.26% (p=0.000 n=10+10) Java/medium_imbalance-8 8.05MB ± 0% 8.16MB ± 0% +1.32% (p=0.000 n=10+10) Java/small-8 6.20MB ± 0% 6.28MB ± 0% +1.32% (p=0.000 n=10+10) Java/small_imbalance-8 6.42MB ± 0% 6.51MB ± 0% +1.38% (p=0.000 n=10+7) name old allocs/op new allocs/op delta Large-8 335 ± 0% 335 ± 0% ~ (p=0.429 n=6+10) LargeWithExisting-8 18.6k ± 1% 18.6k ± 0% ~ (p=0.739 n=10+10) LargeImbalanced-8 786 ± 5% 786 ± 4% ~ (p=0.870 n=10+10) LargeWithExistingImbalanced-8 18.4k ± 0% 18.4k ± 0% ~ (p=0.518 n=10+7) Java/large-8 6.04k ± 0% 6.04k ± 0% ~ (all equal) Java/large_imbalance-8 7.43k ± 0% 7.42k ± 0% ~ (p=0.072 n=9+8) Java/medium-8 3.03k ± 0% 3.03k ± 0% ~ (all equal) Java/medium_imbalance-8 3.15k ± 0% 3.15k ± 0% ~ (p=0.322 n=10+10) Java/small-8 2.46k ± 0% 2.46k ± 0% ~ (p=0.591 n=10+9) Java/small_imbalance-8 2.55k ± 0% 2.55k ± 0% ~ (p=0.232 n=10+9)
1 parent 9ada82d commit d1341ae

File tree

3 files changed

+224
-75
lines changed

3 files changed

+224
-75
lines changed

Diff for: pkg/kgo/internal/sticky/graph.go

+69-27
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,10 @@ type graph struct {
1414
out [][]uint32
1515

1616
// edge => who owns this edge; built in balancer's assignUnassigned
17-
cxns []uint16
17+
cxns []partitionConsumer
1818

1919
// scores are all node scores from a seach node. The distance field
20-
// is reset on findSteal to noScore.
20+
// is reset on findSteal to infinityScore..
2121
scores pathScores
2222

2323
// heapBuf and pathBuf are backing buffers that are reused every
@@ -28,7 +28,7 @@ type graph struct {
2828
}
2929

3030
func (b *balancer) newGraph(
31-
partitionConsumers []uint16,
31+
partitionConsumers []partitionConsumer,
3232
topicPotentials [][]uint16,
3333
) graph {
3434
g := graph{
@@ -56,15 +56,15 @@ func (b *balancer) newGraph(
5656
}
5757

5858
func (g *graph) changeOwnership(edge int32, newDst uint16) {
59-
g.cxns[edge] = newDst
59+
g.cxns[edge].memberNum = newDst
6060
}
6161

6262
// findSteal uses Dijkstra search to find a path from the best node it can reach.
6363
func (g *graph) findSteal(from uint16) ([]stealSegment, bool) {
6464
// First, we must reset our scores from any prior run. This is O(M),
6565
// but is fast and faster than making a map and extending it a lot.
6666
for i := range g.scores {
67-
g.scores[i].distance = noScore
67+
g.scores[i].distance = infinityScore
6868
g.scores[i].done = false
6969
}
7070

@@ -97,28 +97,40 @@ func (g *graph) findSteal(from uint16) ([]stealSegment, bool) {
9797
info := g.b.topicInfos[topicNum]
9898
firstPartNum, lastPartNum := info.partNum, info.partNum+info.partitions
9999
for edge := firstPartNum; edge < lastPartNum; edge++ {
100-
neighborNode := g.cxns[edge]
100+
neighborNode := g.cxns[edge].memberNum
101101
neighbor, isNew := g.getScore(neighborNode)
102102
if neighbor.done {
103103
continue
104104
}
105105

106106
distance := current.distance + 1
107-
// If our neghbor distance is less or equal, then we can
108-
// reach the neighbor through a previous route we have
109-
// tried and should not try again.
110-
if distance < neighbor.distance {
107+
108+
// The neighbor is the current node that owns this edge.
109+
// If our node originally owned this partition, then it
110+
// would be preferable to steal edge back.
111+
srcIsOriginal := g.cxns[edge].originalNum == current.node
112+
113+
// If this is a new neighbor (our first time seeing the neighbor
114+
// in our search), this is also the shortest path to reach them,
115+
// where shortest defers preference to original sources THEN distance.
116+
if isNew {
111117
neighbor.parent = current
118+
neighbor.srcIsOriginal = srcIsOriginal
112119
neighbor.srcEdge = edge
113120
neighbor.distance = distance
114-
if isNew {
115-
heap.Push(rem, neighbor)
116-
}
117-
118-
// We never need to fix the heap position.
119-
// Our level is static, and once we set
120-
// distance, it is the minimum it will be
121-
// and we never revisit the neighbor.
121+
neighbor.heapIdx = len(*rem)
122+
heap.Push(rem, neighbor)
123+
124+
} else if !neighbor.srcIsOriginal && srcIsOriginal {
125+
// If the search path has seen this neighbor before, but
126+
// we now are evaluating a partition that would increase
127+
// stickiness if stolen, then fixup the neighbor's parent
128+
// and srcEdge.
129+
neighbor.parent = current
130+
neighbor.srcIsOriginal = true
131+
neighbor.srcEdge = edge
132+
neighbor.distance = distance
133+
heap.Fix(rem, neighbor.heapIdx)
122134
}
123135
}
124136
}
@@ -133,23 +145,39 @@ type stealSegment struct {
133145
part int32 // partNum
134146
}
135147

148+
// As we traverse a graph, we assign each node a path score, which tracks a few
149+
// numbers for what it would take to reach this node from our first node.
136150
type pathScore struct {
137-
done bool
138-
node uint16 // member num
151+
// Done is set to true when we pop a node off of the graph. Once we
152+
// pop a node, it means we have found a best path to that node and
153+
// we do not want to revisit it for processing if any other future
154+
// nodes reach back to this one.
155+
done bool
156+
157+
// srcIsOriginal is true if, were our parent to steal srcEdge, would
158+
// that put srcEdge back on the original member. That is, if we are B
159+
// and our parent is A, does our srcEdge originally belong do A?
160+
//
161+
// This field exists to work around a very slim edge case where a
162+
// partition is stolen by B and then needs to be stolen back by A
163+
// later.
164+
srcIsOriginal bool
165+
166+
node uint16 // our member num
139167
distance int32 // how many steals it would take to get here
140-
srcEdge int32 // partNum
168+
srcEdge int32 // the partition used to reach us
141169
level int32 // partitions owned on this segment
142170
parent *pathScore
171+
heapIdx int
143172
}
144173

145174
type pathScores []pathScore
146175

147176
const infinityScore = 1<<31 - 1
148-
const noScore = -1
149177

150178
func (g *graph) getScore(node uint16) (*pathScore, bool) {
151179
r := &g.scores[node]
152-
exists := r.distance != noScore
180+
exists := r.distance != infinityScore
153181
if !exists {
154182
*r = pathScore{
155183
node: node,
@@ -165,14 +193,28 @@ type pathHeap []*pathScore
165193
func (p *pathHeap) Len() int { return len(*p) }
166194
func (p *pathHeap) Swap(i, j int) {
167195
h := *p
168-
h[i], h[j] = h[j], h[i]
196+
l, r := h[i], h[j]
197+
l.heapIdx, r.heapIdx = r.heapIdx, l.heapIdx
198+
h[i], h[j] = r, l
169199
}
170200

201+
// For our path, we always want to prioritize stealing a partition we
202+
// originally owned. This may result in a longer steal path, but it will
203+
// increase stickiness.
204+
//
205+
// Next, our real goal, which is to find a node we can steal from. Because of
206+
// this, we always want to sort by the highest level. The pathHeap stores
207+
// reachable paths, so by sorting by the highest level, we terminate quicker:
208+
// we always check the most likely candidates to quit our search.
209+
//
210+
// Finally, we simply prefer searching through shorter paths and, barring that,
211+
// just sort by node.
171212
func (p *pathHeap) Less(i, j int) bool {
172213
l, r := (*p)[i], (*p)[j]
173-
return l.level > r.level || l.level == r.level &&
174-
(l.distance < r.distance || l.distance == r.distance &&
175-
l.node < r.node)
214+
return l.srcIsOriginal && !r.srcIsOriginal || !l.srcIsOriginal && !r.srcIsOriginal &&
215+
(l.level > r.level || l.level == r.level &&
216+
(l.distance < r.distance || l.distance == r.distance &&
217+
l.node < r.node))
176218
}
177219

178220
func (p *pathHeap) Push(x interface{}) { *p = append(*p, x.(*pathScore)) }

Diff for: pkg/kgo/internal/sticky/sticky.go

+33-48
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,8 @@ func (b *balancer) into() Plan {
132132
topics := make(map[string][]int32, ntopics)
133133
plan[member] = topics
134134

135-
// partInfos is created by topic, and partNums refers to
136-
// indices in partInfos. If we sort by partNum, we have sorted
135+
// partOwners is created by topic, and partNums refers to
136+
// indices in partOwners. If we sort by partNum, we have sorted
137137
// topics and partitions.
138138
sort.Sort(&partNums)
139139

@@ -158,7 +158,6 @@ func (b *balancer) into() Plan {
158158
topicParts = append(topicParts, int32(partition))
159159
}
160160
topics[lastTopicInfo.topic] = topicParts[:len(topicParts):len(topicParts)]
161-
topicParts = topicParts[len(topicParts):]
162161
}
163162
return plan
164163
}
@@ -346,13 +345,6 @@ type memberGeneration struct {
346345
generation int32
347346
}
348347

349-
// for alloc avoidance since it is easy enough.
350-
type memberGenerations []memberGeneration
351-
352-
func (m *memberGenerations) Len() int { return len(*m) }
353-
func (m *memberGenerations) Less(i, j int) bool { s := *m; return s[i].generation > s[j].generation }
354-
func (m *memberGenerations) Swap(i, j int) { s := *m; s[i], s[j] = s[j], s[i] }
355-
356348
type topicPartition struct {
357349
topic string
358350
partition int32
@@ -398,7 +390,7 @@ func (b *balancer) assignUnassignedAndInitGraph() {
398390
// below in the partition mapping. Doing this two step process allows
399391
// for a 10x speed boost rather than ranging over all partitions many
400392
// times.
401-
membersBufs := make([]uint16, len(b.topicNums)*len(b.members))
393+
topicPotentialsBuf := make([]uint16, len(b.topicNums)*len(b.members))
402394
topicPotentials := make([][]uint16, len(b.topicNums))
403395
for memberNum, member := range b.members {
404396
for _, topic := range member.Topics {
@@ -408,8 +400,8 @@ func (b *balancer) assignUnassignedAndInitGraph() {
408400
}
409401
memberNums := topicPotentials[topicNum]
410402
if cap(memberNums) == 0 {
411-
memberNums = membersBufs[:0:len(b.members)]
412-
membersBufs = membersBufs[len(b.members):]
403+
memberNums = topicPotentialsBuf[:0:len(b.members)]
404+
topicPotentialsBuf = topicPotentialsBuf[len(b.members):]
413405
}
414406
topicPotentials[topicNum] = append(memberNums, uint16(memberNum))
415407
}
@@ -432,16 +424,15 @@ func (b *balancer) assignUnassignedAndInitGraph() {
432424
// Next, over the prior plan, un-map deleted topics or topics that
433425
// members no longer want. This is where we determine what is now
434426
// unassigned.
435-
partitionConsumers := make([]uint16, cap(b.partOwners)) // partNum => consuming member
427+
partitionConsumers := make([]partitionConsumer, cap(b.partOwners)) // partNum => consuming member
436428
for i := range partitionConsumers {
437-
partitionConsumers[i] = unassignedPart
429+
partitionConsumers[i] = partitionConsumer{unassignedPart, unassignedPart}
438430
}
439431
for memberNum := range b.plan {
440432
partNums := &b.plan[memberNum]
441433
for _, partNum := range *partNums {
442434
topicNum := b.partOwners[partNum]
443435
if len(topicPotentials[topicNum]) == 0 { // all prior subscriptions stopped wanting this partition
444-
partitionConsumers[partNum] = deletedPart
445436
partNums.remove(partNum)
446437
continue
447438
}
@@ -457,7 +448,7 @@ func (b *balancer) assignUnassignedAndInitGraph() {
457448
partNums.remove(partNum)
458449
continue
459450
}
460-
partitionConsumers[partNum] = uint16(memberNum)
451+
partitionConsumers[partNum] = partitionConsumer{uint16(memberNum), uint16(memberNum)}
461452
}
462453
}
463454

@@ -467,7 +458,7 @@ func (b *balancer) assignUnassignedAndInitGraph() {
467458
}
468459

469460
for partNum, owner := range partitionConsumers {
470-
if owner != unassignedPart {
461+
if owner.memberNum != unassignedPart {
471462
continue
472463
}
473464
potentials := topicPotentials[b.partOwners[partNum]]
@@ -477,7 +468,7 @@ func (b *balancer) assignUnassignedAndInitGraph() {
477468
assigned := potentials[0]
478469
b.plan[assigned].add(int32(partNum))
479470
(&membersByPartitions{potentials, b.plan}).fix0()
480-
partitionConsumers[partNum] = assigned
471+
partitionConsumers[partNum].memberNum = assigned
481472
}
482473

483474
// Lastly, with everything assigned, we build our steal graph for
@@ -490,20 +481,9 @@ func (b *balancer) assignUnassignedAndInitGraph() {
490481
}
491482
}
492483

493-
const (
494-
// deletedPart and unassignedPart are fake member numbers that we use
495-
// to track if a partition is deleted or unassigned.
496-
//
497-
// deletedPart is technically unneeded; if no member wants a partition,
498-
// no member will be seen as a potential for taking it, so tracking
499-
// that it was deleted is unnecessary. We do though just to be
500-
// explicit.
501-
//
502-
// unassignedPart is the default of partitions until we process what
503-
// members say they were assigned prior.
504-
deletedPart = math.MaxUint16
505-
unassignedPart = math.MaxUint16 - 1
506-
)
484+
// unassignedPart is a fake member number that we use to track if a partition
485+
// is deleted or unassigned.
486+
const unassignedPart = math.MaxUint16 - 1
507487

508488
// tryRestickyStales is a pre-assigning step where, for all stale members,
509489
// we give partitions back to them if the partition is currently on an
@@ -512,7 +492,7 @@ const (
512492
// This effectively re-stickies members before we balance further.
513493
func (b *balancer) tryRestickyStales(
514494
topicPotentials [][]uint16,
515-
partitionConsumers []uint16,
495+
partitionConsumers []partitionConsumer,
516496
) {
517497
for staleNum, lastOwnerNum := range b.stales {
518498
potentials := topicPotentials[b.partOwners[staleNum]] // there must be a potential consumer if we are here
@@ -533,7 +513,7 @@ func (b *balancer) tryRestickyStales(
533513
// must be on a different owner (cannot be lastOwner),
534514
// otherwise it would not be a lastOwner in the stales
535515
// map; it would just be the current owner.
536-
currentOwner := partitionConsumers[staleNum]
516+
currentOwner := partitionConsumers[staleNum].memberNum
537517
lastOwnerPartitions := &b.plan[lastOwnerNum]
538518
currentOwnerPartitions := &b.plan[currentOwner]
539519
if lastOwnerPartitions.len()+1 < currentOwnerPartitions.len() {
@@ -543,6 +523,11 @@ func (b *balancer) tryRestickyStales(
543523
}
544524
}
545525

526+
type partitionConsumer struct {
527+
memberNum uint16
528+
originalNum uint16
529+
}
530+
546531
// While assigning, we keep members per topic heap sorted by the number of
547532
// partitions they are currently consuming. This allows us to have quick
548533
// assignment vs. always scanning to see the min loaded member.
@@ -605,14 +590,14 @@ func (b *balancer) balance() {
605590
return
606591
}
607592

608-
minRem := min.members
609-
maxRem := max.members
610-
for len(minRem) > 0 && len(maxRem) > 0 {
611-
dst := minRem[0]
612-
src := maxRem[0]
593+
minMems := min.members
594+
maxMems := max.members
595+
for len(minMems) > 0 && len(maxMems) > 0 {
596+
dst := minMems[0]
597+
src := maxMems[0]
613598

614-
minRem = minRem[1:]
615-
maxRem = maxRem[1:]
599+
minMems = minMems[1:]
600+
maxMems = maxMems[1:]
616601

617602
srcPartitions := &b.plan[src]
618603
dstPartitions := &b.plan[dst]
@@ -623,14 +608,14 @@ func (b *balancer) balance() {
623608
nextUp := b.findLevel(min.level + 1)
624609
nextDown := b.findLevel(max.level - 1)
625610

626-
upEnd := len(min.members) - len(minRem)
627-
downEnd := len(max.members) - len(maxRem)
611+
endOfUps := len(min.members) - len(minMems)
612+
endOfDowns := len(max.members) - len(maxMems)
628613

629-
nextUp.members = append(nextUp.members, min.members[:upEnd]...)
630-
nextDown.members = append(nextDown.members, max.members[:downEnd]...)
614+
nextUp.members = append(nextUp.members, min.members[:endOfUps]...)
615+
nextDown.members = append(nextDown.members, max.members[:endOfDowns]...)
631616

632-
min.members = min.members[upEnd:]
633-
max.members = max.members[downEnd:]
617+
min.members = min.members[endOfUps:]
618+
max.members = max.members[endOfDowns:]
634619

635620
if len(min.members) == 0 {
636621
b.planByNumPartitions.Delete(b.planByNumPartitions.Min())

0 commit comments

Comments
 (0)