Skip to content

Commit d9c34c0

Browse files
committed
complex sticky: remove A* and just use breadth first
After further thought, the heuristic function was not entirely accurate yet. Our search was operating on # of steals, but the heuristic was returning how beneficial it would be to steal a level. A more accurate guess from any node would be 1 unless the node happens to be a target node. I benchmarked that and it was marginally better than the previous code, but just removing the heuristic leads to a better improvement and keeps things simpler, so we may as well just remove the heuristic. Removing the heuristic allows us to remove fscore and simplifies the heap comparison logic, which actually gives us a speed boost. name old time/op new time/op delta Large-8 2.15ms ± 1% 2.15ms ± 1% ~ (p=0.968 n=9+10) LargeWithExisting-8 8.67ms ± 1% 8.64ms ± 2% ~ (p=0.079 n=9+10) LargeImbalanced-8 2.93ms ± 4% 2.88ms ± 2% ~ (p=0.089 n=10+10) LargeWithExistingImbalanced-8 8.75ms ± 1% 8.69ms ± 1% -0.65% (p=0.002 n=9+9) Java/large-8 159ms ± 1% 158ms ± 1% -0.76% (p=0.023 n=10+10) Java/large_imbalance-8 197ms ± 3% 188ms ± 2% -4.38% (p=0.000 n=10+9) Java/medium-8 8.94ms ± 1% 8.86ms ± 1% ~ (p=0.053 n=10+9) Java/medium_imbalance-8 10.5ms ± 2% 10.3ms ± 1% -2.38% (p=0.000 n=10+8) Java/small-8 6.81ms ± 2% 6.80ms ± 1% ~ (p=0.604 n=10+9) Java/small_imbalance-8 8.43ms ± 1% 8.27ms ± 1% -1.94% (p=0.000 n=9+6) name old alloc/op new alloc/op delta Large-8 1.92MB ± 0% 1.92MB ± 0% ~ (p=0.060 n=10+10) LargeWithExisting-8 3.69MB ± 0% 3.69MB ± 0% ~ (p=0.661 n=9+10) LargeImbalanced-8 2.12MB ± 1% 2.12MB ± 1% ~ (p=0.912 n=10+10) LargeWithExistingImbalanced-8 3.72MB ± 0% 3.72MB ± 0% ~ (p=0.071 n=8+10) Java/large-8 127MB ± 0% 127MB ± 0% ~ (p=0.069 n=10+9) Java/large_imbalance-8 131MB ± 0% 131MB ± 0% -0.01% (p=0.000 n=8+9) Java/medium-8 7.78MB ± 0% 7.78MB ± 0% ~ (p=0.210 n=10+10) Java/medium_imbalance-8 8.06MB ± 0% 8.05MB ± 0% -0.10% (p=0.000 n=10+10) Java/small-8 6.20MB ± 0% 6.20MB ± 0% ~ (p=0.109 n=10+10) Java/small_imbalance-8 6.42MB ± 0% 6.42MB ± 0% -0.10% (p=0.000 n=10+10) name old allocs/op new allocs/op delta Large-8 335 ± 0% 335 ± 0% ~ (p=0.961 n=10+8) LargeWithExisting-8 18.5k ± 1% 18.6k ± 1% ~ (p=0.119 n=10+10) LargeImbalanced-8 785 ± 7% 792 ± 7% ~ (p=0.631 n=10+10) LargeWithExistingImbalanced-8 18.4k ± 1% 18.5k ± 0% ~ (p=0.085 n=10+10) Java/large-8 6.04k ± 0% 6.04k ± 0% ~ (all equal) Java/large_imbalance-8 7.43k ± 0% 7.43k ± 0% ~ (p=0.678 n=8+9) Java/medium-8 3.03k ± 0% 3.03k ± 0% ~ (all equal) Java/medium_imbalance-8 3.15k ± 0% 3.15k ± 0% ~ (p=0.670 n=10+10) Java/small-8 2.46k ± 0% 2.46k ± 0% ~ (p=0.108 n=10+9) Java/small_imbalance-8 2.55k ± 0% 2.55k ± 0% ~ (p=0.493 n=10+10)
1 parent 2c473c4 commit d9c34c0

File tree

1 file changed

+24
-42
lines changed

1 file changed

+24
-42
lines changed

pkg/kgo/internal/sticky/graph.go

+24-42
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ type graph struct {
1616
// edge => who owns this edge; built in balancer's assignUnassigned
1717
cxns []uint16
1818

19-
// scores are all node scores from a seach node. The gscore field
19+
// scores are all node scores from a seach node. The distance field
2020
// is reset on findSteal to noScore.
2121
scores pathScores
2222

@@ -59,19 +59,18 @@ func (g *graph) changeOwnership(edge int32, newDst uint16) {
5959
g.cxns[edge] = newDst
6060
}
6161

62-
// findSteal uses A* search to find a path from the best node it can reach.
62+
// findSteal uses Dijkstra search to find a path from the best node it can reach.
6363
func (g *graph) findSteal(from uint16) ([]stealSegment, bool) {
6464
// First, we must reset our scores from any prior run. This is O(M),
6565
// but is fast and faster than making a map and extending it a lot.
6666
for i := range g.scores {
67-
g.scores[i].gscore = noScore
67+
g.scores[i].distance = noScore
6868
g.scores[i].done = false
6969
}
7070

7171
first, _ := g.getScore(from)
7272

73-
first.gscore = 0
74-
first.fscore = h(first, first)
73+
first.distance = 0
7574
first.done = true
7675

7776
g.heapBuf = append(g.heapBuf[:0], first)
@@ -104,22 +103,22 @@ func (g *graph) findSteal(from uint16) ([]stealSegment, bool) {
104103
continue
105104
}
106105

107-
gscore := current.gscore + 1
108-
// If our neghbor gscore is less or equal, then we can
106+
distance := current.distance + 1
107+
// If our neghbor distance is less or equal, then we can
109108
// reach the neighbor through a previous route we have
110109
// tried and should not try again.
111-
if gscore < neighbor.gscore {
110+
if distance < neighbor.distance {
112111
neighbor.parent = current
113112
neighbor.srcEdge = edge
114-
neighbor.gscore = gscore
115-
neighbor.fscore = gscore + h(first, neighbor)
113+
neighbor.distance = distance
116114
if isNew {
117115
heap.Push(rem, neighbor)
118116
}
117+
119118
// We never need to fix the heap position.
120-
// Our level and fscore is static, and once
121-
// we set gscore, it is the minumum it will be
122-
// and we never revisit this neighbor.
119+
// Our level is static, and once we set
120+
// distance, it is the minimum it will be
121+
// and we never revisit the neighbor.
123122
}
124123
}
125124
}
@@ -135,43 +134,27 @@ type stealSegment struct {
135134
}
136135

137136
type pathScore struct {
138-
done bool
139-
node uint16 // member num
140-
parent *pathScore
141-
srcEdge int32 // partNum
142-
level int32 // partitions owned on this segment
143-
gscore int32 // how many steals it would take to get here
144-
fscore int32
137+
done bool
138+
node uint16 // member num
139+
distance int32 // how many steals it would take to get here
140+
srcEdge int32 // partNum
141+
level int32 // partitions owned on this segment
142+
parent *pathScore
145143
}
146144

147145
type pathScores []pathScore
148146

149147
const infinityScore = 1<<31 - 1
150148
const noScore = -1
151149

152-
// For A*, if we never overestimate (with h), then the path we find is
153-
// optimal. A true estimation of our distance to any node is the node's
154-
// level minus ours.
155-
//
156-
// At worst, our target must be +2 levels from us. So, our estimation
157-
// any node can be our level, +2, minus theirs.
158-
func h(first, target *pathScore) int32 {
159-
r := first.level + 2 - target.level
160-
if r < 0 {
161-
return 0
162-
}
163-
return r
164-
}
165-
166150
func (g *graph) getScore(node uint16) (*pathScore, bool) {
167151
r := &g.scores[node]
168-
exists := r.gscore != noScore
152+
exists := r.distance != noScore
169153
if !exists {
170154
*r = pathScore{
171-
node: node,
172-
level: int32(len(g.b.plan[node])),
173-
gscore: infinityScore,
174-
fscore: infinityScore,
155+
node: node,
156+
level: int32(len(g.b.plan[node])),
157+
distance: infinityScore,
175158
}
176159
}
177160
return r, !exists
@@ -188,9 +171,8 @@ func (p *pathHeap) Swap(i, j int) {
188171
func (p *pathHeap) Less(i, j int) bool {
189172
l, r := (*p)[i], (*p)[j]
190173
return l.level > r.level || l.level == r.level &&
191-
(l.fscore < r.fscore || l.fscore == r.fscore &&
192-
(l.gscore < r.gscore || l.gscore == r.gscore &&
193-
l.node < r.node))
174+
(l.distance < r.distance || l.distance == r.distance &&
175+
l.node < r.node)
194176
}
195177

196178
func (p *pathHeap) Push(x interface{}) { *p = append(*p, x.(*pathScore)) }

0 commit comments

Comments
 (0)