Skip to content

Commit ac48413

Browse files
committed
feat: exponential backoff for clients
Signed-off-by: Wenli Wan <[email protected]>
1 parent 9ae475a commit ac48413

File tree

3 files changed

+138
-1
lines changed

3 files changed

+138
-1
lines changed

async_producer_test.go

+60
Original file line numberDiff line numberDiff line change
@@ -638,6 +638,66 @@ func TestAsyncProducerMultipleRetriesWithBackoffFunc(t *testing.T) {
638638
}
639639
}
640640

641+
func TestAsyncProducerWithExponentialBackoffDurations(t *testing.T) {
642+
var backoffDurations []time.Duration
643+
var mu sync.Mutex
644+
645+
topic := "my_topic"
646+
maxBackoff := 2 * time.Second
647+
config := NewTestConfig()
648+
649+
backoffFunc := func(retries, maxRetries int) time.Duration {
650+
duration := NewExponentialBackoff(config.Producer.Retry.Backoff, maxBackoff)(retries, maxRetries)
651+
mu.Lock()
652+
backoffDurations = append(backoffDurations, duration)
653+
mu.Unlock()
654+
return duration
655+
}
656+
657+
config.Producer.Flush.MaxMessages = 1
658+
config.Producer.Return.Successes = true
659+
config.Producer.Retry.Max = 4
660+
config.Producer.Retry.BackoffFunc = backoffFunc
661+
662+
broker := NewMockBroker(t, 1)
663+
664+
metadataResponse := new(MetadataResponse)
665+
metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
666+
metadataResponse.AddTopicPartition(topic, 0, broker.BrokerID(), nil, nil, nil, ErrNoError)
667+
broker.Returns(metadataResponse)
668+
669+
producer, err := NewAsyncProducer([]string{broker.Addr()}, config)
670+
if err != nil {
671+
t.Fatal(err)
672+
}
673+
674+
failResponse := new(ProduceResponse)
675+
failResponse.AddTopicPartition(topic, 0, ErrNotLeaderForPartition)
676+
successResponse := new(ProduceResponse)
677+
successResponse.AddTopicPartition(topic, 0, ErrNoError)
678+
679+
broker.Returns(failResponse)
680+
broker.Returns(metadataResponse)
681+
broker.Returns(failResponse)
682+
broker.Returns(metadataResponse)
683+
broker.Returns(successResponse)
684+
685+
producer.Input() <- &ProducerMessage{Topic: topic, Value: StringEncoder("test")}
686+
687+
expectResults(t, producer, 1, 0)
688+
closeProducer(t, producer)
689+
broker.Close()
690+
691+
for i := 1; i < len(backoffDurations); i++ {
692+
if backoffDurations[i] < backoffDurations[i-1] {
693+
t.Errorf("expected backoff[%d] >= backoff[%d], got %v < %v", i, i-1, backoffDurations[i], backoffDurations[i-1])
694+
}
695+
if backoffDurations[i] > maxBackoff {
696+
t.Errorf("backoff exceeded max: %v", backoffDurations[i])
697+
}
698+
}
699+
}
700+
641701
// https://github.com/IBM/sarama/issues/2129
642702
func TestAsyncProducerMultipleRetriesWithConcurrentRequests(t *testing.T) {
643703
// Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)

utils.go

+36
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,11 @@ package sarama
33
import (
44
"bufio"
55
"fmt"
6+
"math/rand"
67
"net"
78
"regexp"
9+
"sync"
10+
"time"
811
)
912

1013
type none struct{}
@@ -344,3 +347,36 @@ func (v KafkaVersion) String() string {
344347

345348
return fmt.Sprintf("%d.%d.%d", v.version[0], v.version[1], v.version[2])
346349
}
350+
351+
// NewExponentialBackoff returns a function that implements an exponential backoff strategy with jitter.
352+
// It follows KIP-580, implementing the formula:
353+
// MIN(retry.backoff.max.ms, (retry.backoff.ms * 2**(retries - 1)) * random(0.8, 1.2))
354+
// This ensures retries start with `backoff` and exponentially increase until `maxBackoff`, with added jitter.
355+
//
356+
// Example usage:
357+
//
358+
// backoffFunc := NewExponentialBackoff(config.Producer.Retry.Backoff, 2*time.Second)
359+
// config.Producer.Retry.BackoffFunc = backoffFunc
360+
func NewExponentialBackoff(backoff time.Duration, maxBackoff time.Duration) func(retries, maxRetries int) time.Duration {
361+
var rngMu sync.Mutex
362+
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
363+
364+
return func(retries, maxRetries int) time.Duration {
365+
if retries <= 0 {
366+
return backoff
367+
}
368+
369+
calculatedBackoff := backoff * time.Duration(1<<(retries-1))
370+
371+
rngMu.Lock()
372+
jitter := 0.8 + 0.4*rng.Float64()
373+
rngMu.Unlock()
374+
375+
calculatedBackoff = time.Duration(float64(calculatedBackoff) * jitter)
376+
377+
if calculatedBackoff > maxBackoff {
378+
return maxBackoff
379+
}
380+
return calculatedBackoff
381+
}
382+
}

utils_test.go

+42-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,11 @@
22

33
package sarama
44

5-
import "testing"
5+
import (
6+
"sync"
7+
"testing"
8+
"time"
9+
)
610

711
func TestVersionCompare(t *testing.T) {
812
if V0_8_2_0.IsAtLeast(V0_8_2_1) {
@@ -95,3 +99,40 @@ func TestVersionParsing(t *testing.T) {
9599
}
96100
}
97101
}
102+
103+
func TestExponentialBackoffCorrectness(t *testing.T) {
104+
backoffFunc := NewExponentialBackoff(100*time.Millisecond, 2*time.Second)
105+
testCases := []struct {
106+
retries int
107+
maxRetries int
108+
minBackoff time.Duration
109+
maxBackoff time.Duration
110+
}{
111+
{0, 5, 100 * time.Millisecond, 100 * time.Millisecond},
112+
{1, 5, 80 * time.Millisecond, 120 * time.Millisecond},
113+
{3, 5, 320 * time.Millisecond, 480 * time.Millisecond},
114+
{5, 5, 1280 * time.Millisecond, 2 * time.Second},
115+
}
116+
117+
for _, tc := range testCases {
118+
backoff := backoffFunc(tc.retries, tc.maxRetries)
119+
if backoff < tc.minBackoff || backoff > tc.maxBackoff {
120+
t.Errorf("retries=%d: expected backoff between %v and %v, got %v", tc.retries, tc.minBackoff, tc.maxBackoff, backoff)
121+
}
122+
}
123+
}
124+
125+
func TestExponentialBackoffRaceDetection(t *testing.T) {
126+
backoffFunc := NewExponentialBackoff(100*time.Millisecond, 2*time.Second)
127+
var wg sync.WaitGroup
128+
concurrency := 1000
129+
130+
wg.Add(concurrency)
131+
for i := 0; i < concurrency; i++ {
132+
go func(i int) {
133+
defer wg.Done()
134+
_ = backoffFunc(i%10, 5)
135+
}(i)
136+
}
137+
wg.Wait()
138+
}

0 commit comments

Comments
 (0)