Skip to content

Commit 66e60c7

Browse files
authored
fix(consumer): don't retry FindCoordinator forever (#2427)
If the consumer group hit an error finding the coordinator whilst setting up a session it would end up retrying forever because the retry count didn't get decremented and there was no exit of the loop. Fix that up and add a unittest to cover the scenario Fixes #2426
1 parent 6acb276 commit 66e60c7

File tree

2 files changed

+49
-1
lines changed

2 files changed

+49
-1
lines changed

consumer_group.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,10 @@ func (c *consumerGroup) retryNewSession(ctx context.Context, topics []string, ha
252252
if refreshCoordinator {
253253
err := c.client.RefreshCoordinator(c.groupID)
254254
if err != nil {
255-
return c.retryNewSession(ctx, topics, handler, retries, true)
255+
if retries <= 0 {
256+
return nil, err
257+
}
258+
return c.retryNewSession(ctx, topics, handler, retries-1, true)
256259
}
257260
}
258261

consumer_group_test.go

+45
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import (
66
"sync"
77
"testing"
88
"time"
9+
10+
assert "github.com/stretchr/testify/require"
911
)
1012

1113
type handler struct {
@@ -200,3 +202,46 @@ outerFor:
200202

201203
cancel()
202204
}
205+
206+
// TestConsumerGroupSessionDoesNotRetryForever ensures that an error fetching
207+
// the coordinator decrements the retry attempts and doesn't end up retrying
208+
// forever
209+
func TestConsumerGroupSessionDoesNotRetryForever(t *testing.T) {
210+
config := NewTestConfig()
211+
config.ClientID = t.Name()
212+
config.Version = V2_0_0_0
213+
config.Consumer.Return.Errors = true
214+
config.Consumer.Group.Rebalance.Retry.Max = 1
215+
config.Consumer.Group.Rebalance.Retry.Backoff = 0
216+
217+
broker0 := NewMockBroker(t, 0)
218+
219+
broker0.SetHandlerByMap(map[string]MockResponse{
220+
"MetadataRequest": NewMockMetadataResponse(t).
221+
SetBroker(broker0.Addr(), broker0.BrokerID()).
222+
SetLeader("my-topic", 0, broker0.BrokerID()),
223+
"FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).
224+
SetError(CoordinatorGroup, "my-group", ErrGroupAuthorizationFailed),
225+
})
226+
227+
group, err := NewConsumerGroup([]string{broker0.Addr()}, "my-group", config)
228+
if err != nil {
229+
t.Fatal(err)
230+
}
231+
defer func() { _ = group.Close() }()
232+
233+
ctx, cancel := context.WithCancel(context.Background())
234+
h := &handler{t, cancel}
235+
236+
var wg sync.WaitGroup
237+
wg.Add(1)
238+
239+
go func() {
240+
topics := []string{"my-topic"}
241+
err := group.Consume(ctx, topics, h)
242+
assert.Error(t, err)
243+
wg.Done()
244+
}()
245+
246+
wg.Wait()
247+
}

0 commit comments

Comments
 (0)