Skip to content

Commit e168855

Browse files
committed
sticky: fix and drastically simplify isComplex detection
Before this commit, if a sticky balance with tons of disparate members only passed in **one** topic, then the code was unable to detect if the balance was complex, because we only detected complexity after the first topic. Now, we simply check if all members want to consume each topic equally. topics2memberNums is built from topics each member is interested in, so if any topic one member is interested in is not interested in by all members, then we know we are complex.
1 parent bd5d5ad commit e168855

File tree

2 files changed

+25
-15
lines changed

2 files changed

+25
-15
lines changed

pkg/kgo/internal/sticky/sticky.go

+9-15
Original file line numberDiff line numberDiff line change
@@ -404,29 +404,23 @@ func (b *balancer) assignUnassignedAndInitGraph() {
404404
}
405405

406406
partitionPotentials := make([][]uint16, cap(b.partNames)) // for each partition, who can consume it?
407-
var firstTopicMembers []uint16
408407
for topic, topicMembers := range topics2memberNums {
409408
for partition := int32(0); partition < b.topics[topic]; partition++ {
410409
tp := topicPartition{topic, partition}
411410
partNum := b.partNum(tp)
412411
partitionPotentials[partNum] = topicMembers
413412
}
414413

415-
// While building partition potentials, we can check whether
416-
// all topics are consumed the same.
417-
if firstTopicMembers == nil {
418-
firstTopicMembers = topicMembers
419-
continue
420-
}
421-
if b.isComplex || len(topicMembers) != len(firstTopicMembers) {
414+
// If the number of members interested in this topic is not the
415+
// same as the number of members in this group, then **other**
416+
// members are interested in other topics and not this one, and
417+
// we must go to complex balancing.
418+
//
419+
// We could accidentally fall into isComplex if any member is
420+
// not interested in anything, but realistically we do not
421+
// expect members to join with no interests.
422+
if len(topicMembers) != len(b.members) {
422423
b.isComplex = true
423-
continue
424-
}
425-
for i, memberNum := range topicMembers {
426-
if memberNum != firstTopicMembers[i] {
427-
b.isComplex = true
428-
break
429-
}
430424
}
431425
}
432426

pkg/kgo/internal/sticky/sticky_test.go

+16
Original file line numberDiff line numberDiff line change
@@ -881,6 +881,22 @@ func TestImbalanced(t *testing.T) {
881881
},
882882
},
883883

884+
{
885+
name: "unequal",
886+
members: []GroupMember{
887+
{ID: "0", Topics: []string{"0"}},
888+
{ID: "1", Topics: []string{"1"}},
889+
},
890+
topics: map[string]int32{
891+
"1": 2,
892+
},
893+
nsticky: 0,
894+
balance: map[int]resultOptions{
895+
0: {[]string{"0"}, 1},
896+
2: {[]string{"1"}, 1},
897+
},
898+
},
899+
884900
//
885901
} {
886902
t.Run(test.name, func(t *testing.T) {

0 commit comments

Comments
 (0)