@@ -37,6 +37,7 @@ func TestConsumerGroupNewSessionDuringOffsetLoad(t *testing.T) {
37
37
config .Version = V2_0_0_0
38
38
config .Consumer .Return .Errors = true
39
39
config .Consumer .Group .Rebalance .Retry .Max = 2
40
+ config .Consumer .Group .Rebalance .Retry .Backoff = 0
40
41
config .Consumer .Offsets .AutoCommit .Enable = false
41
42
42
43
broker0 := NewMockBroker (t , 0 )
@@ -100,72 +101,46 @@ func TestConsumerGroupNewSessionDuringOffsetLoad(t *testing.T) {
100
101
}
101
102
102
103
func TestConsume_RaceTest (t * testing.T ) {
103
- const groupID = "test-group"
104
- const topic = "test-topic"
105
- const offsetStart = int64 (1234 )
104
+ const (
105
+ groupID = "test-group"
106
+ topic = "test-topic"
107
+ offsetStart = int64 (1234 )
108
+ )
106
109
107
- cfg := NewConfig ()
110
+ cfg := NewTestConfig ()
108
111
cfg .Version = V2_8_1_0
109
112
cfg .Consumer .Return .Errors = true
113
+ cfg .Metadata .Full = true
110
114
111
115
seedBroker := NewMockBroker (t , 1 )
112
-
113
- joinGroupResponse := & JoinGroupResponse {}
114
-
115
- syncGroupResponse := & SyncGroupResponse {
116
- Version : 3 , // sarama > 2.3.0.0 uses version 3
117
- }
118
- // Leverage mock response to get the MemberAssignment bytes
119
- mockSyncGroupResponse := NewMockSyncGroupResponse (t ).SetMemberAssignment (& ConsumerGroupMemberAssignment {
120
- Version : 1 ,
121
- Topics : map [string ][]int32 {topic : {0 }}, // map "test-topic" to partition 0
122
- UserData : []byte {0x01 },
123
- })
124
- syncGroupResponse .MemberAssignment = mockSyncGroupResponse .MemberAssignment
125
-
126
- heartbeatResponse := & HeartbeatResponse {
127
- Err : ErrNoError ,
128
- }
129
- offsetFetchResponse := & OffsetFetchResponse {
130
- Version : 1 ,
131
- ThrottleTimeMs : 0 ,
132
- Err : ErrNoError ,
133
- }
134
- offsetFetchResponse .AddBlock (topic , 0 , & OffsetFetchResponseBlock {
135
- Offset : offsetStart ,
136
- LeaderEpoch : 0 ,
137
- Metadata : "" ,
138
- Err : ErrNoError ,
139
- })
140
-
141
- offsetResponse := & OffsetResponse {
142
- Version : 1 ,
143
- }
144
- offsetResponse .AddTopicPartition (topic , 0 , offsetStart )
145
-
146
- metadataResponse := new (MetadataResponse )
147
- metadataResponse .Version = 10
148
- metadataResponse .AddBroker (seedBroker .Addr (), seedBroker .BrokerID ())
149
- metadataResponse .AddTopic ("mismatched-topic" , ErrUnknownTopicOrPartition )
116
+ defer seedBroker .Close ()
150
117
151
118
handlerMap := map [string ]MockResponse {
152
119
"ApiVersionsRequest" : NewMockApiVersionsResponse (t ),
153
- "MetadataRequest" : NewMockSequence (metadataResponse ),
154
- "OffsetRequest" : NewMockSequence (offsetResponse ),
155
- "OffsetFetchRequest" : NewMockSequence (offsetFetchResponse ),
156
- "FindCoordinatorRequest" : NewMockSequence (NewMockFindCoordinatorResponse (t ).
157
- SetCoordinator (CoordinatorGroup , groupID , seedBroker )),
158
- "JoinGroupRequest" : NewMockSequence (joinGroupResponse ),
159
- "SyncGroupRequest" : NewMockSequence (syncGroupResponse ),
160
- "HeartbeatRequest" : NewMockSequence (heartbeatResponse ),
120
+ "MetadataRequest" : NewMockMetadataResponse (t ).
121
+ SetBroker (seedBroker .Addr (), seedBroker .BrokerID ()).
122
+ SetError ("mismatched-topic" , ErrUnknownTopicOrPartition ),
123
+ "OffsetRequest" : NewMockOffsetResponse (t ).
124
+ SetOffset (topic , 0 , - 1 , offsetStart ),
125
+ "OffsetFetchRequest" : NewMockOffsetFetchResponse (t ).
126
+ SetOffset (groupID , topic , 0 , offsetStart , "" , ErrNoError ),
127
+ "FindCoordinatorRequest" : NewMockFindCoordinatorResponse (t ).
128
+ SetCoordinator (CoordinatorGroup , groupID , seedBroker ),
129
+ "JoinGroupRequest" : NewMockJoinGroupResponse (t ),
130
+ "SyncGroupRequest" : NewMockSyncGroupResponse (t ).SetMemberAssignment (
131
+ & ConsumerGroupMemberAssignment {
132
+ Version : 1 ,
133
+ Topics : map [string ][]int32 {topic : {0 }}, // map "test-topic" to partition 0
134
+ UserData : []byte {0x01 },
135
+ },
136
+ ),
137
+ "HeartbeatRequest" : NewMockHeartbeatResponse (t ),
161
138
}
162
139
seedBroker .SetHandlerByMap (handlerMap )
163
140
164
- cancelCtx , cancel := context .WithDeadline (context .Background (), time .Now ().Add (4 * time .Second ))
165
-
166
- defer seedBroker .Close ()
141
+ cancelCtx , cancel := context .WithDeadline (context .Background (), time .Now ().Add (time .Second ))
167
142
168
- retryWait := 20 * time .Millisecond
143
+ retryWait := 10 * time .Millisecond
169
144
var err error
170
145
clientRetries := 0
171
146
outerFor:
@@ -195,8 +170,8 @@ outerFor:
195
170
t .Fatalf ("should not proceed to Consume" )
196
171
}
197
172
198
- if clientRetries <= 0 {
199
- t .Errorf ("clientRetries = %v; want > 0 " , clientRetries )
173
+ if clientRetries <= 1 {
174
+ t .Errorf ("clientRetries = %v; want > 1 " , clientRetries )
200
175
}
201
176
202
177
if err != nil && ! errors .Is (err , context .DeadlineExceeded ) {
0 commit comments