Skip to content

Commit f401b55

Browse files
committed
fix(proto): use full range of MetadataRequest
Even though most of these are identical, we may as well match up correctly. Signed-off-by: Dominic Evans <[email protected]>
1 parent 8c40629 commit f401b55

5 files changed

+46
-35
lines changed

async_producer_test.go

+11-11
Original file line numberDiff line numberDiff line change
@@ -1111,7 +1111,7 @@ func TestAsyncProducerIdempotentGoldenPath(t *testing.T) {
11111111
broker := NewMockBroker(t, 1)
11121112

11131113
metadataResponse := &MetadataResponse{
1114-
Version: 1,
1114+
Version: 4,
11151115
ControllerID: 1,
11161116
}
11171117
metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
@@ -1169,7 +1169,7 @@ func TestAsyncProducerIdempotentRetryCheckBatch(t *testing.T) {
11691169
broker := NewMockBroker(t, 1)
11701170

11711171
metadataResponse := &MetadataResponse{
1172-
Version: 1,
1172+
Version: 4,
11731173
ControllerID: 1,
11741174
}
11751175
metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
@@ -1307,7 +1307,7 @@ func TestAsyncProducerIdempotentRetryCheckBatch_2378(t *testing.T) {
13071307
broker := NewMockBroker(t, 1)
13081308

13091309
metadataResponse := &MetadataResponse{
1310-
Version: 1,
1310+
Version: 4,
13111311
ControllerID: 1,
13121312
}
13131313
metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
@@ -1375,7 +1375,7 @@ func TestAsyncProducerIdempotentErrorOnOutOfSeq(t *testing.T) {
13751375
broker := NewMockBroker(t, 1)
13761376

13771377
metadataResponse := &MetadataResponse{
1378-
Version: 1,
1378+
Version: 4,
13791379
ControllerID: 1,
13801380
}
13811381
metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
@@ -1425,7 +1425,7 @@ func TestAsyncProducerIdempotentEpochRollover(t *testing.T) {
14251425
defer broker.Close()
14261426

14271427
metadataResponse := &MetadataResponse{
1428-
Version: 1,
1428+
Version: 4,
14291429
ControllerID: 1,
14301430
}
14311431
metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
@@ -1501,7 +1501,7 @@ func TestAsyncProducerIdempotentEpochExhaustion(t *testing.T) {
15011501
)
15021502

15031503
metadataResponse := &MetadataResponse{
1504-
Version: 1,
1504+
Version: 4,
15051505
ControllerID: 1,
15061506
}
15071507
metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
@@ -1737,7 +1737,7 @@ func TestTxmngInitProducerId(t *testing.T) {
17371737
defer broker.Close()
17381738

17391739
metadataLeader := new(MetadataResponse)
1740-
metadataLeader.Version = 1
1740+
metadataLeader.Version = 4
17411741
metadataLeader.AddBroker(broker.Addr(), broker.BrokerID())
17421742
broker.Returns(metadataLeader)
17431743

@@ -1879,7 +1879,7 @@ func TestTxnProduceRecordWithCommit(t *testing.T) {
18791879
config.Net.MaxOpenRequests = 1
18801880

18811881
metadataLeader := new(MetadataResponse)
1882-
metadataLeader.Version = 1
1882+
metadataLeader.Version = 4
18831883
metadataLeader.ControllerID = broker.brokerID
18841884
metadataLeader.AddBroker(broker.Addr(), broker.BrokerID())
18851885
metadataLeader.AddTopic("test-topic", ErrNoError)
@@ -1960,7 +1960,7 @@ func TestTxnProduceBatchAddPartition(t *testing.T) {
19601960
config.Producer.Partitioner = NewManualPartitioner
19611961

19621962
metadataLeader := new(MetadataResponse)
1963-
metadataLeader.Version = 1
1963+
metadataLeader.Version = 4
19641964
metadataLeader.ControllerID = broker.brokerID
19651965
metadataLeader.AddBroker(broker.Addr(), broker.BrokerID())
19661966
metadataLeader.AddTopic("test-topic", ErrNoError)
@@ -2067,7 +2067,7 @@ func TestTxnProduceRecordWithAbort(t *testing.T) {
20672067
config.Net.MaxOpenRequests = 1
20682068

20692069
metadataLeader := new(MetadataResponse)
2070-
metadataLeader.Version = 1
2070+
metadataLeader.Version = 4
20712071
metadataLeader.ControllerID = broker.brokerID
20722072
metadataLeader.AddBroker(broker.Addr(), broker.BrokerID())
20732073
metadataLeader.AddTopic("test-topic", ErrNoError)
@@ -2147,7 +2147,7 @@ func TestTxnCanAbort(t *testing.T) {
21472147
config.Net.MaxOpenRequests = 1
21482148

21492149
metadataLeader := new(MetadataResponse)
2150-
metadataLeader.Version = 1
2150+
metadataLeader.Version = 4
21512151
metadataLeader.ControllerID = broker.brokerID
21522152
metadataLeader.AddBroker(broker.Addr(), broker.BrokerID())
21532153
metadataLeader.AddTopic("test-topic", ErrNoError)

client_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1071,7 +1071,7 @@ func TestClientCoordinatorConnectionRefused(t *testing.T) {
10711071
func TestInitProducerIDConnectionRefused(t *testing.T) {
10721072
t.Parallel()
10731073
seedBroker := NewMockBroker(t, 1)
1074-
seedBroker.Returns(&MetadataResponse{Version: 1})
1074+
seedBroker.Returns(&MetadataResponse{Version: 4})
10751075

10761076
config := NewTestConfig()
10771077
config.Producer.Idempotent = true

metadata_request.go

+17-11
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@ func NewMetadataRequest(version KafkaVersion, topics []string) *MetadataRequest
1717
m.Version = 6
1818
} else if version.IsAtLeast(V1_0_0_0) {
1919
m.Version = 5
20+
} else if version.IsAtLeast(V0_11_0_0) {
21+
m.Version = 4
22+
} else if version.IsAtLeast(V0_10_1_0) {
23+
m.Version = 2
2024
} else if version.IsAtLeast(V0_10_0_0) {
2125
m.Version = 1
2226
}
@@ -94,19 +98,21 @@ func (r *MetadataRequest) isValidVersion() bool {
9498

9599
func (r *MetadataRequest) requiredVersion() KafkaVersion {
96100
switch r.Version {
97-
case 1:
98-
return V0_10_0_0
99-
case 2:
100-
return V0_10_1_0
101-
case 3, 4:
102-
return V0_11_0_0
103-
case 5:
104-
return V1_0_0_0
105-
case 6:
106-
return V2_0_0_0
107101
case 7:
108102
return V2_1_0_0
103+
case 6:
104+
return V2_0_0_0
105+
case 5:
106+
return V1_0_0_0
107+
case 3, 4:
108+
return V0_11_0_0
109+
case 2:
110+
return V0_10_1_0
111+
case 1:
112+
return V0_10_0_0
113+
case 0:
114+
return V0_8_2_0
109115
default:
110-
return MinVersion
116+
return V2_1_0_0
111117
}
112118
}

metadata_response.go

+13-11
Original file line numberDiff line numberDiff line change
@@ -281,20 +281,22 @@ func (r *MetadataResponse) isValidVersion() bool {
281281

282282
func (r *MetadataResponse) requiredVersion() KafkaVersion {
283283
switch r.Version {
284-
case 1:
285-
return V0_10_0_0
286-
case 2:
287-
return V0_10_1_0
288-
case 3, 4:
289-
return V0_11_0_0
290-
case 5:
291-
return V1_0_0_0
292-
case 6:
293-
return V2_0_0_0
294284
case 7:
295285
return V2_1_0_0
286+
case 6:
287+
return V2_0_0_0
288+
case 5:
289+
return V1_0_0_0
290+
case 3, 4:
291+
return V0_11_0_0
292+
case 2:
293+
return V0_10_1_0
294+
case 1:
295+
return V0_10_0_0
296+
case 0:
297+
return V0_8_2_0
296298
default:
297-
return MinVersion
299+
return V2_1_0_0
298300
}
299301
}
300302

sync_producer_test.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ func TestSyncProducer(t *testing.T) {
5858
}
5959

6060
func TestSyncProducerTransactional(t *testing.T) {
61+
prevLogger := Logger
62+
defer func() { Logger = prevLogger }()
63+
Logger = &testLogger{t}
6164
seedBroker := NewMockBroker(t, 1)
6265
defer seedBroker.Close()
6366
leader := NewMockBroker(t, 2)
@@ -73,7 +76,7 @@ func TestSyncProducerTransactional(t *testing.T) {
7376
config.Net.MaxOpenRequests = 1
7477

7578
metadataResponse := new(MetadataResponse)
76-
metadataResponse.Version = 1
79+
metadataResponse.Version = 4
7780
metadataResponse.ControllerID = leader.BrokerID()
7881
metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
7982
metadataResponse.AddTopic("my_topic", ErrNoError)

0 commit comments

Comments
 (0)