@@ -4,7 +4,6 @@ package sarama
4
4
5
5
import (
6
6
"errors"
7
- "github.com/stretchr/testify/assert"
8
7
"log"
9
8
"math"
10
9
"os"
@@ -18,6 +17,7 @@ import (
18
17
19
18
"github.com/fortytw2/leaktest"
20
19
"github.com/rcrowley/go-metrics"
20
+ "github.com/stretchr/testify/assert"
21
21
"github.com/stretchr/testify/require"
22
22
)
23
23
@@ -638,6 +638,68 @@ func TestAsyncProducerMultipleRetriesWithBackoffFunc(t *testing.T) {
638
638
}
639
639
}
640
640
641
+ func TestAsyncProducerWithExponentialBackoffDurations (t * testing.T ) {
642
+ var backoffDurations []time.Duration
643
+ var mu sync.Mutex
644
+
645
+ topic := "my_topic"
646
+ maxBackoff := 2 * time .Second
647
+ config := NewTestConfig ()
648
+
649
+ innerBackoffFunc := NewExponentialBackoff (defaultRetryBackoff , maxBackoff )
650
+ backoffFunc := func (retries , maxRetries int ) time.Duration {
651
+ duration := innerBackoffFunc (retries , maxRetries )
652
+ mu .Lock ()
653
+ backoffDurations = append (backoffDurations , duration )
654
+ mu .Unlock ()
655
+ return duration
656
+ }
657
+
658
+ config .Producer .Flush .Messages = 5
659
+ config .Producer .Return .Successes = true
660
+ config .Producer .Retry .Max = 3
661
+ config .Producer .Retry .BackoffFunc = backoffFunc
662
+
663
+ broker := NewMockBroker (t , 1 )
664
+
665
+ metadataResponse := new (MetadataResponse )
666
+ metadataResponse .AddBroker (broker .Addr (), broker .BrokerID ())
667
+ metadataResponse .AddTopicPartition (topic , 0 , broker .BrokerID (), nil , nil , nil , ErrNoError )
668
+ broker .Returns (metadataResponse )
669
+
670
+ producer , err := NewAsyncProducer ([]string {broker .Addr ()}, config )
671
+ if err != nil {
672
+ t .Fatal (err )
673
+ }
674
+
675
+ failResponse := new (ProduceResponse )
676
+ failResponse .AddTopicPartition (topic , 0 , ErrNotLeaderForPartition )
677
+ successResponse := new (ProduceResponse )
678
+ successResponse .AddTopicPartition (topic , 0 , ErrNoError )
679
+
680
+ broker .Returns (failResponse )
681
+ broker .Returns (metadataResponse )
682
+ broker .Returns (failResponse )
683
+ broker .Returns (metadataResponse )
684
+ broker .Returns (successResponse )
685
+
686
+ for i := 0 ; i < 5 ; i ++ {
687
+ producer .Input () <- & ProducerMessage {Topic : topic , Value : StringEncoder ("test" )}
688
+ }
689
+
690
+ expectResults (t , producer , 5 , 0 )
691
+ closeProducer (t , producer )
692
+ broker .Close ()
693
+
694
+ assert .Greater (t , backoffDurations [0 ], time .Duration (0 ),
695
+ "Expected first backoff duration to be greater than 0" )
696
+ for i := 1 ; i < len (backoffDurations ); i ++ {
697
+ assert .Greater (t , backoffDurations [i ], time .Duration (0 ))
698
+ assert .GreaterOrEqual (t , backoffDurations [i ], backoffDurations [i - 1 ])
699
+ assert .LessOrEqual (t , backoffDurations [i ], maxBackoff )
700
+ }
701
+ }
702
+
641
703
// https://github.com/IBM/sarama/issues/2129
642
704
func TestAsyncProducerMultipleRetriesWithConcurrentRequests (t * testing.T ) {
643
705
// Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
0 commit comments