From 9127f1c432c08b9a766fdb0931bfd0da80dddde0 Mon Sep 17 00:00:00 2001 From: nap_all_day <44193366+napallday@users.noreply.github.com> Date: Mon, 27 Mar 2023 22:19:28 +0800 Subject: [PATCH] fix: data race in balance strategy (#2453) --- balance_strategy.go | 62 ++++++++++++++++++++----------- balance_strategy_test.go | 25 +++++++++++-- config.go | 6 +-- config_test.go | 2 +- examples/consumergroup/main.go | 6 +-- examples/exactly_once/main.go | 2 +- functional_consumer_group_test.go | 2 +- 7 files changed, 70 insertions(+), 35 deletions(-) diff --git a/balance_strategy.go b/balance_strategy.go index cf6e398c0..ec0a05767 100644 --- a/balance_strategy.go +++ b/balance_strategy.go @@ -57,7 +57,8 @@ type BalanceStrategy interface { // -------------------------------------------------------------------- -// BalanceStrategyRange is the default and assigns partitions as ranges to consumer group members. +// NewBalanceStrategyRange returns a range balance strategy, +// which is the default and assigns partitions as ranges to consumer group members. // This follows the same logic as // https://kafka.apache.org/31/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html // @@ -65,27 +66,33 @@ type BalanceStrategy interface { // // M1: {T1: [0, 1, 2], T2: [0, 1, 2]} // M2: {T2: [3, 4, 5], T2: [3, 4, 5]} -var BalanceStrategyRange = &balanceStrategy{ - name: RangeBalanceStrategyName, - coreFn: func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32) { - partitionsPerConsumer := len(partitions) / len(memberIDs) - consumersWithExtraPartition := len(partitions) % len(memberIDs) - - sort.Strings(memberIDs) - - for i, memberID := range memberIDs { - min := i*partitionsPerConsumer + int(math.Min(float64(consumersWithExtraPartition), float64(i))) - extra := 0 - if i < consumersWithExtraPartition { - extra = 1 +func NewBalanceStrategyRange() BalanceStrategy { + return &balanceStrategy{ + name: RangeBalanceStrategyName, + coreFn: func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32) { + partitionsPerConsumer := len(partitions) / len(memberIDs) + consumersWithExtraPartition := len(partitions) % len(memberIDs) + + sort.Strings(memberIDs) + + for i, memberID := range memberIDs { + min := i*partitionsPerConsumer + int(math.Min(float64(consumersWithExtraPartition), float64(i))) + extra := 0 + if i < consumersWithExtraPartition { + extra = 1 + } + max := min + partitionsPerConsumer + extra + plan.Add(memberID, topic, partitions[min:max]...) } - max := min + partitionsPerConsumer + extra - plan.Add(memberID, topic, partitions[min:max]...) - } - }, + }, + } } -// BalanceStrategySticky assigns partitions to members with an attempt to preserve earlier assignments +// Deprecated: use NewBalanceStrategyRange to avoid data race issue +var BalanceStrategyRange = NewBalanceStrategyRange() + +// NewBalanceStrategySticky returns a sticky balance strategy, +// which assigns partitions to members with an attempt to preserve earlier assignments // while maintain a balanced partition distribution. // Example with topic T with six partitions (0..5) and two members (M1, M2): // @@ -97,7 +104,12 @@ var BalanceStrategyRange = &balanceStrategy{ // M1: {T: [0, 2]} // M2: {T: [1, 3]} // M3: {T: [4, 5]} -var BalanceStrategySticky = &stickyBalanceStrategy{} +func NewBalanceStrategySticky() BalanceStrategy { + return &stickyBalanceStrategy{} +} + +// Deprecated: use NewBalanceStrategySticky to avoid data race issue +var BalanceStrategySticky = NewBalanceStrategySticky() // -------------------------------------------------------------------- @@ -331,11 +343,17 @@ func (s *stickyBalanceStrategy) balance(currentAssignment map[string][]topicPart } } -// BalanceStrategyRoundRobin assigns partitions to members in alternating order. +// NewBalanceStrategyRoundRobin returns a round-robin balance strategy, +// which assigns partitions to members in alternating order. // For example, there are two topics (t0, t1) and two consumer (m0, m1), and each topic has three partitions (p0, p1, p2): // M0: [t0p0, t0p2, t1p1] // M1: [t0p1, t1p0, t1p2] -var BalanceStrategyRoundRobin = new(roundRobinBalancer) +func NewBalanceStrategyRoundRobin() BalanceStrategy { + return new(roundRobinBalancer) +} + +// Deprecated: use NewBalanceStrategyRoundRobin to avoid data race issue +var BalanceStrategyRoundRobin = NewBalanceStrategyRoundRobin() type roundRobinBalancer struct{} diff --git a/balance_strategy_test.go b/balance_strategy_test.go index 452a6a398..f397a9073 100644 --- a/balance_strategy_test.go +++ b/balance_strategy_test.go @@ -73,7 +73,7 @@ func TestBalanceStrategyRange(t *testing.T) { }, } - strategy := BalanceStrategyRange + strategy := NewBalanceStrategyRange() if strategy.Name() != "range" { t.Errorf("Unexpected stategy name\nexpected: range\nactual: %v", strategy.Name()) } @@ -96,7 +96,7 @@ func TestBalanceStrategyRange(t *testing.T) { } func TestBalanceStrategyRangeAssignmentData(t *testing.T) { - strategy := BalanceStrategyRange + strategy := NewBalanceStrategyRange() members := make(map[string]ConsumerGroupMemberMetadata, 2) members["consumer1"] = ConsumerGroupMemberMetadata{ @@ -177,7 +177,7 @@ func TestBalanceStrategyRoundRobin(t *testing.T) { }, } - strategy := BalanceStrategyRoundRobin + strategy := NewBalanceStrategyRoundRobin() if strategy.Name() != "roundrobin" { t.Errorf("Unexpected strategy name\nexpected: roundrobin\nactual: %v", strategy.Name()) } @@ -284,7 +284,7 @@ func Test_deserializeTopicPartitionAssignment(t *testing.T) { } func TestBalanceStrategyRoundRobinAssignmentData(t *testing.T) { - strategy := BalanceStrategyRoundRobin + strategy := NewBalanceStrategyRoundRobin() members := make(map[string]ConsumerGroupMemberMetadata, 2) members["consumer1"] = ConsumerGroupMemberMetadata{ @@ -2094,6 +2094,23 @@ func Test_stickyBalanceStrategy_Plan_AssignmentData(t *testing.T) { } } +func Test_stickyBalanceStrategy_Plan_data_race(t *testing.T) { + for i := 0; i < 1000; i++ { + go func(bs BalanceStrategy) { + members := map[string]ConsumerGroupMemberMetadata{ + "m1": { + Version: 3, + Topics: []string{"topic"}, + }, + } + topics := map[string][]int32{ + "topic": {0, 1, 2}, + } + _, _ = bs.Plan(members, topics) + }(NewBalanceStrategySticky()) + } +} + func BenchmarkStickAssignmentWithLargeNumberOfConsumersAndTopics(b *testing.B) { s := &stickyBalanceStrategy{} r := rand.New(rand.NewSource(time.Now().UnixNano())) diff --git a/config.go b/config.go index b07034434..1933af3fd 100644 --- a/config.go +++ b/config.go @@ -294,7 +294,7 @@ type Config struct { Interval time.Duration } Rebalance struct { - // Strategy for allocating topic partitions to members (default BalanceStrategyRange) + // Strategy for allocating topic partitions to members. // Deprecated: Strategy exists for historical compatibility // and should not be used. Please use GroupStrategies. Strategy BalanceStrategy @@ -302,7 +302,7 @@ type Config struct { // GroupStrategies is the priority-ordered list of client-side consumer group // balancing strategies that will be offered to the coordinator. The first // strategy that all group members support will be chosen by the leader. - // default: [BalanceStrategyRange] + // default: [ NewBalanceStrategyRange() ] GroupStrategies []BalanceStrategy // The maximum allowed time for each worker to join the group once a rebalance has begun. @@ -539,7 +539,7 @@ func NewConfig() *Config { c.Consumer.Group.Session.Timeout = 10 * time.Second c.Consumer.Group.Heartbeat.Interval = 3 * time.Second - c.Consumer.Group.Rebalance.GroupStrategies = []BalanceStrategy{BalanceStrategyRange} + c.Consumer.Group.Rebalance.GroupStrategies = []BalanceStrategy{NewBalanceStrategyRange()} c.Consumer.Group.Rebalance.Timeout = 60 * time.Second c.Consumer.Group.Rebalance.Retry.Max = 4 c.Consumer.Group.Rebalance.Retry.Backoff = 2 * time.Second diff --git a/config_test.go b/config_test.go index a97ca1885..1f29ee249 100644 --- a/config_test.go +++ b/config_test.go @@ -548,7 +548,7 @@ func TestGroupInstanceIdAndVersionValidation(t *testing.T) { func TestConsumerGroupStrategyCompatibility(t *testing.T) { config := NewTestConfig() - config.Consumer.Group.Rebalance.Strategy = BalanceStrategySticky + config.Consumer.Group.Rebalance.Strategy = NewBalanceStrategySticky() if err := config.Validate(); err != nil { t.Error("Expected passing config validation, got ", err) } diff --git a/examples/consumergroup/main.go b/examples/consumergroup/main.go index d7d2f916b..88606b5ff 100644 --- a/examples/consumergroup/main.go +++ b/examples/consumergroup/main.go @@ -70,11 +70,11 @@ func main() { switch assignor { case "sticky": - config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.BalanceStrategySticky} + config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategySticky()} case "roundrobin": - config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.BalanceStrategyRoundRobin} + config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRoundRobin()} case "range": - config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.BalanceStrategyRange} + config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRange()} default: log.Panicf("Unrecognized consumer group partition assignor: %s", assignor) } diff --git a/examples/exactly_once/main.go b/examples/exactly_once/main.go index 61fbc04f2..70207c887 100644 --- a/examples/exactly_once/main.go +++ b/examples/exactly_once/main.go @@ -76,7 +76,7 @@ func main() { config.Consumer.IsolationLevel = sarama.ReadCommitted config.Consumer.Offsets.AutoCommit.Enable = false - config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin + config.Consumer.Group.Rebalance.Strategy = sarama.NewBalanceStrategyRoundRobin() if oldest { config.Consumer.Offsets.Initial = sarama.OffsetOldest diff --git a/functional_consumer_group_test.go b/functional_consumer_group_test.go index 6be383f0c..f05666185 100644 --- a/functional_consumer_group_test.go +++ b/functional_consumer_group_test.go @@ -552,7 +552,7 @@ func (m *testFuncConsumerGroupMember) loop(topics []string) { func newTestStatefulStrategy(t *testing.T) *testStatefulStrategy { return &testStatefulStrategy{ - BalanceStrategy: BalanceStrategyRange, + BalanceStrategy: NewBalanceStrategyRange(), t: t, } }