Skip to content

Commit 72ea327

Browse files
authored
feat(proto): implement and use MetadataRequest v7 (#2388)
In order to receive the LeaderEpoch as part of the metadata response we need to be sending and receiving version 7 of the MetadataRequest+Response protocol. Luckily the first flexible version wasn't until v8 so the changes to achieve this are minimal. Contributes-to: #2365
1 parent 38204cb commit 72ea327

File tree

6 files changed

+156
-138
lines changed

6 files changed

+156
-138
lines changed

admin.go

+3-20
Original file line numberDiff line numberDiff line change
@@ -280,17 +280,7 @@ func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*TopicMetada
280280
return nil, err
281281
}
282282

283-
request := &MetadataRequest{
284-
Topics: topics,
285-
AllowAutoTopicCreation: false,
286-
}
287-
288-
if ca.conf.Version.IsAtLeast(V1_0_0_0) {
289-
request.Version = 5
290-
} else if ca.conf.Version.IsAtLeast(V0_11_0_0) {
291-
request.Version = 4
292-
}
293-
283+
request := NewMetadataRequest(ca.conf.Version, topics)
294284
response, err := controller.GetMetadata(request)
295285
if err != nil {
296286
return nil, err
@@ -304,14 +294,7 @@ func (ca *clusterAdmin) DescribeCluster() (brokers []*Broker, controllerID int32
304294
return nil, int32(0), err
305295
}
306296

307-
request := &MetadataRequest{
308-
Topics: []string{},
309-
}
310-
311-
if ca.conf.Version.IsAtLeast(V0_10_0_0) {
312-
request.Version = 1
313-
}
314-
297+
request := NewMetadataRequest(ca.conf.Version, nil)
315298
response, err := controller.GetMetadata(request)
316299
if err != nil {
317300
return nil, int32(0), err
@@ -352,7 +335,7 @@ func (ca *clusterAdmin) ListTopics() (map[string]TopicDetail, error) {
352335
}
353336
_ = b.Open(ca.client.Config())
354337

355-
metadataReq := &MetadataRequest{}
338+
metadataReq := NewMetadataRequest(ca.conf.Version, nil)
356339
metadataResp, err := b.GetMetadata(metadataReq)
357340
if err != nil {
358341
return nil, err

async_producer_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1711,7 +1711,7 @@ func TestTxnProduceBumpEpoch(t *testing.T) {
17111711
config.ApiVersionsRequest = false
17121712

17131713
metadataLeader := new(MetadataResponse)
1714-
metadataLeader.Version = 5
1714+
metadataLeader.Version = 7
17151715
metadataLeader.ControllerID = broker.brokerID
17161716
metadataLeader.AddBroker(broker.Addr(), broker.BrokerID())
17171717
metadataLeader.AddTopic("test-topic", ErrNoError)

client.go

+2-7
Original file line numberDiff line numberDiff line change
@@ -989,13 +989,8 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int,
989989
DebugLogger.Printf("client/metadata fetching metadata for all topics from broker %s\n", broker.addr)
990990
}
991991

992-
req := &MetadataRequest{Topics: topics, AllowAutoTopicCreation: allowAutoTopicCreation}
993-
if client.conf.Version.IsAtLeast(V1_0_0_0) {
994-
req.Version = 5
995-
} else if client.conf.Version.IsAtLeast(V0_10_0_0) {
996-
req.Version = 1
997-
}
998-
992+
req := NewMetadataRequest(client.conf.Version, topics)
993+
req.AllowAutoTopicCreation = allowAutoTopicCreation
999994
t := atomic.LoadInt64(&client.updateMetaDataMs)
1000995
if !atomic.CompareAndSwapInt64(&client.updateMetaDataMs, t, time.Now().UnixNano()/int64(time.Millisecond)) {
1001996
return nil

metadata_request.go

+33-10
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,30 @@
11
package sarama
22

33
type MetadataRequest struct {
4-
Version int16
5-
Topics []string
4+
// Version defines the protocol version to use for encode and decode
5+
Version int16
6+
// Topics contains the topics to fetch metadata for.
7+
Topics []string
8+
// AllowAutoTopicCreation contains a If this is true, the broker may auto-create topics that we requested which do not already exist, if it is configured to do so.
69
AllowAutoTopicCreation bool
710
}
811

9-
func (r *MetadataRequest) encode(pe packetEncoder) error {
10-
if r.Version < 0 || r.Version > 5 {
12+
func NewMetadataRequest(version KafkaVersion, topics []string) *MetadataRequest {
13+
m := &MetadataRequest{Topics: topics}
14+
if version.IsAtLeast(V2_1_0_0) {
15+
m.Version = 7
16+
} else if version.IsAtLeast(V2_0_0_0) {
17+
m.Version = 6
18+
} else if version.IsAtLeast(V1_0_0_0) {
19+
m.Version = 5
20+
} else if version.IsAtLeast(V0_10_0_0) {
21+
m.Version = 1
22+
}
23+
return m
24+
}
25+
26+
func (r *MetadataRequest) encode(pe packetEncoder) (err error) {
27+
if r.Version < 0 || r.Version > 12 {
1128
return PacketEncodingError{"invalid or unsupported MetadataRequest version field"}
1229
}
1330
if r.Version == 0 || len(r.Topics) > 0 {
@@ -25,13 +42,15 @@ func (r *MetadataRequest) encode(pe packetEncoder) error {
2542
} else {
2643
pe.putInt32(-1)
2744
}
28-
if r.Version > 3 {
45+
46+
if r.Version >= 4 {
2947
pe.putBool(r.AllowAutoTopicCreation)
3048
}
49+
3150
return nil
3251
}
3352

34-
func (r *MetadataRequest) decode(pd packetDecoder, version int16) error {
53+
func (r *MetadataRequest) decode(pd packetDecoder, version int16) (err error) {
3554
r.Version = version
3655
size, err := pd.getInt32()
3756
if err != nil {
@@ -47,13 +66,13 @@ func (r *MetadataRequest) decode(pd packetDecoder, version int16) error {
4766
r.Topics[i] = topic
4867
}
4968
}
50-
if r.Version > 3 {
51-
autoCreation, err := pd.getBool()
52-
if err != nil {
69+
70+
if r.Version >= 4 {
71+
if r.AllowAutoTopicCreation, err = pd.getBool(); err != nil {
5372
return err
5473
}
55-
r.AllowAutoTopicCreation = autoCreation
5674
}
75+
5776
return nil
5877
}
5978

@@ -79,6 +98,10 @@ func (r *MetadataRequest) requiredVersion() KafkaVersion {
7998
return V0_11_0_0
8099
case 5:
81100
return V1_0_0_0
101+
case 6:
102+
return V2_0_0_0
103+
case 7:
104+
return V2_1_0_0
82105
default:
83106
return MinVersion
84107
}

0 commit comments

Comments
 (0)