Skip to content

Commit c42b2e0

Browse files
authored
fix(client): ignore empty Metadata responses when refreshing (#2672)
We should skip the metadata refresh if the startup phase broker returns empty brokers in metadata response. The Java client skips the empty response to update the metadata cache (https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L1149) and we should make a feature parity in Sarama too Fixes #2664 Signed-off-by: Hao Sun <[email protected]>
1 parent 6678dd1 commit c42b2e0

File tree

5 files changed

+92
-10
lines changed

5 files changed

+92
-10
lines changed

Diff for: client.go

+7
Original file line numberDiff line numberDiff line change
@@ -1035,6 +1035,13 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int,
10351035
var kerror KError
10361036
var packetEncodingError PacketEncodingError
10371037
if err == nil {
1038+
// When talking to the startup phase of a broker, it is possible to receive an empty metadata set. We should remove that broker and try next broker (https://issues.apache.org/jira/browse/KAFKA-7924).
1039+
if len(response.Brokers) == 0 {
1040+
Logger.Println("client/metadata receiving empty brokers from the metadata response when requesting the broker #%d at %s", broker.ID(), broker.addr)
1041+
_ = broker.Close()
1042+
client.deregisterBroker(broker)
1043+
continue
1044+
}
10381045
allKnownMetaData := len(topics) == 0
10391046
// valid response, use it
10401047
shouldRetry, err := client.updateMetadata(response, allKnownMetaData)

Diff for: client_test.go

+78-8
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ func safeClose(t testing.TB, c io.Closer) {
2323
func TestSimpleClient(t *testing.T) {
2424
seedBroker := NewMockBroker(t, 1)
2525

26-
seedBroker.Returns(new(MetadataResponse))
26+
metadataResponse := new(MetadataResponse)
27+
metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
28+
seedBroker.Returns(metadataResponse)
2729

2830
client, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig())
2931
if err != nil {
@@ -92,6 +94,7 @@ func TestClientDoesntCachePartitionsForTopicsWithErrors(t *testing.T) {
9294
}
9395

9496
metadataResponse = new(MetadataResponse)
97+
metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
9598
metadataResponse.AddTopic("unknown", ErrUnknownTopicOrPartition)
9699
seedBroker.Returns(metadataResponse)
97100

@@ -111,6 +114,7 @@ func TestClientDoesntCachePartitionsForTopicsWithErrors(t *testing.T) {
111114
}
112115

113116
metadataResponse = new(MetadataResponse)
117+
metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
114118
metadataResponse.AddTopic("unknown", ErrUnknownTopicOrPartition)
115119
seedBroker.Returns(metadataResponse)
116120

@@ -358,6 +362,7 @@ func TestClientReceivingUnknownTopicWithBackoffFunc(t *testing.T) {
358362
seedBroker := NewMockBroker(t, 1)
359363

360364
metadataResponse1 := new(MetadataResponse)
365+
metadataResponse1.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
361366
seedBroker.Returns(metadataResponse1)
362367

363368
retryCount := int32(0)
@@ -375,6 +380,7 @@ func TestClientReceivingUnknownTopicWithBackoffFunc(t *testing.T) {
375380

376381
metadataUnknownTopic := new(MetadataResponse)
377382
metadataUnknownTopic.AddTopic("new_topic", ErrUnknownTopicOrPartition)
383+
metadataUnknownTopic.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
378384
seedBroker.Returns(metadataUnknownTopic)
379385
seedBroker.Returns(metadataUnknownTopic)
380386

@@ -395,6 +401,7 @@ func TestClientReceivingUnknownTopic(t *testing.T) {
395401
seedBroker := NewMockBroker(t, 1)
396402

397403
metadataResponse1 := new(MetadataResponse)
404+
metadataResponse1.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
398405
seedBroker.Returns(metadataResponse1)
399406

400407
config := NewTestConfig()
@@ -406,6 +413,7 @@ func TestClientReceivingUnknownTopic(t *testing.T) {
406413
}
407414

408415
metadataUnknownTopic := new(MetadataResponse)
416+
metadataUnknownTopic.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
409417
metadataUnknownTopic.AddTopic("new_topic", ErrUnknownTopicOrPartition)
410418
seedBroker.Returns(metadataUnknownTopic)
411419
seedBroker.Returns(metadataUnknownTopic)
@@ -481,6 +489,53 @@ func TestClientReceivingPartialMetadata(t *testing.T) {
481489
leader.Close()
482490
}
483491

492+
func TestClientRefreshBehaviourWhenEmptyMetadataResponse(t *testing.T) {
493+
seedBroker := NewMockBroker(t, 1)
494+
broker := NewMockBroker(t, 2)
495+
496+
metadataResponse1 := new(MetadataResponse)
497+
metadataResponse1.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
498+
seedBroker.Returns(metadataResponse1)
499+
500+
c, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig())
501+
if err != nil {
502+
t.Fatal(err)
503+
}
504+
client := c.(*client)
505+
if len(client.seedBrokers) != 1 {
506+
t.Error("incorrect number of live seeds")
507+
}
508+
if len(client.deadSeeds) != 0 {
509+
t.Error("incorrect number of dead seeds")
510+
}
511+
if len(client.brokers) != 1 {
512+
t.Error("incorrect number of brokers")
513+
}
514+
515+
// Empty metadata response
516+
seedBroker.Returns(new(MetadataResponse))
517+
metadataResponse2 := new(MetadataResponse)
518+
metadataResponse2.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
519+
metadataResponse2.AddBroker(broker.Addr(), broker.BrokerID())
520+
seedBroker.Returns(metadataResponse2)
521+
err = c.RefreshMetadata()
522+
if err != nil {
523+
t.Fatal(err)
524+
}
525+
if len(client.seedBrokers) != 1 {
526+
t.Error("incorrect number of live seeds")
527+
}
528+
if len(client.deadSeeds) != 0 {
529+
t.Error("incorrect number of dead seeds")
530+
}
531+
if len(client.brokers) != 2 {
532+
t.Error("incorrect number of brokers")
533+
}
534+
broker.Close()
535+
seedBroker.Close()
536+
safeClose(t, client)
537+
}
538+
484539
func TestClientRefreshBehaviour(t *testing.T) {
485540
seedBroker := NewMockBroker(t, 1)
486541
leader := NewMockBroker(t, 5)
@@ -633,8 +688,9 @@ func TestClientGetBroker(t *testing.T) {
633688

634689
func TestClientResurrectDeadSeeds(t *testing.T) {
635690
initialSeed := NewMockBroker(t, 0)
636-
emptyMetadata := new(MetadataResponse)
637-
initialSeed.Returns(emptyMetadata)
691+
metadataResponse := new(MetadataResponse)
692+
metadataResponse.AddBroker(initialSeed.Addr(), initialSeed.BrokerID())
693+
initialSeed.Returns(metadataResponse)
638694

639695
conf := NewTestConfig()
640696
conf.Metadata.Retry.Backoff = 0
@@ -643,7 +699,6 @@ func TestClientResurrectDeadSeeds(t *testing.T) {
643699
if err != nil {
644700
t.Fatal(err)
645701
}
646-
initialSeed.Close()
647702

648703
client := c.(*client)
649704

@@ -658,6 +713,7 @@ func TestClientResurrectDeadSeeds(t *testing.T) {
658713
safeClose(t, client.seedBrokers[0])
659714
client.seedBrokers = []*Broker{NewBroker(addr1), NewBroker(addr2), NewBroker(addr3)}
660715
client.deadSeeds = []*Broker{}
716+
client.brokers = map[int32]*Broker{}
661717

662718
wg := sync.WaitGroup{}
663719
wg.Add(1)
@@ -676,7 +732,9 @@ func TestClientResurrectDeadSeeds(t *testing.T) {
676732
seed3.Close()
677733

678734
seed1.Close()
679-
seed2.Returns(emptyMetadata)
735+
metadataResponse2 := new(MetadataResponse)
736+
metadataResponse2.AddBroker(seed2.Addr(), seed2.BrokerID())
737+
seed2.Returns(metadataResponse2)
680738

681739
wg.Wait()
682740

@@ -767,6 +825,7 @@ func TestClientMetadataTimeout(t *testing.T) {
767825
// Use a responsive broker to create a working client
768826
initialSeed := NewMockBroker(t, 0)
769827
emptyMetadata := new(MetadataResponse)
828+
emptyMetadata.AddBroker(initialSeed.Addr(), initialSeed.BrokerID())
770829
initialSeed.Returns(emptyMetadata)
771830

772831
conf := NewTestConfig()
@@ -996,6 +1055,7 @@ func TestClientCoordinatorWithoutConsumerOffsetsTopic(t *testing.T) {
9961055
coordinator := NewMockBroker(t, 2)
9971056

9981057
metadataResponse1 := new(MetadataResponse)
1058+
metadataResponse1.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
9991059
seedBroker.Returns(metadataResponse1)
10001060

10011061
config := NewTestConfig()
@@ -1011,11 +1071,13 @@ func TestClientCoordinatorWithoutConsumerOffsetsTopic(t *testing.T) {
10111071
seedBroker.Returns(coordinatorResponse1)
10121072

10131073
metadataResponse2 := new(MetadataResponse)
1074+
metadataResponse2.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
10141075
metadataResponse2.AddTopic("__consumer_offsets", ErrUnknownTopicOrPartition)
10151076
seedBroker.Returns(metadataResponse2)
10161077

10171078
replicas := []int32{coordinator.BrokerID()}
10181079
metadataResponse3 := new(MetadataResponse)
1080+
metadataResponse3.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
10191081
metadataResponse3.AddTopicPartition("__consumer_offsets", 0, replicas[0], replicas, replicas, []int32{}, ErrNoError)
10201082
seedBroker.Returns(metadataResponse3)
10211083

@@ -1049,6 +1111,7 @@ func TestClientAutorefreshShutdownRace(t *testing.T) {
10491111
defer seedBroker.Close()
10501112

10511113
metadataResponse := new(MetadataResponse)
1114+
metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
10521115
seedBroker.Returns(metadataResponse)
10531116

10541117
conf := NewTestConfig()
@@ -1105,7 +1168,9 @@ func TestClientConnectionRefused(t *testing.T) {
11051168
func TestClientCoordinatorConnectionRefused(t *testing.T) {
11061169
t.Parallel()
11071170
seedBroker := NewMockBroker(t, 1)
1108-
seedBroker.Returns(new(MetadataResponse))
1171+
metadataResponse := new(MetadataResponse)
1172+
metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
1173+
seedBroker.Returns(metadataResponse)
11091174

11101175
client, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig())
11111176
if err != nil {
@@ -1130,7 +1195,10 @@ func TestClientCoordinatorConnectionRefused(t *testing.T) {
11301195
func TestInitProducerIDConnectionRefused(t *testing.T) {
11311196
t.Parallel()
11321197
seedBroker := NewMockBroker(t, 1)
1133-
seedBroker.Returns(&MetadataResponse{Version: 4})
1198+
metadataResponse := new(MetadataResponse)
1199+
metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
1200+
metadataResponse.Version = 4
1201+
seedBroker.Returns(metadataResponse)
11341202

11351203
config := NewTestConfig()
11361204
config.Producer.Idempotent = true
@@ -1161,7 +1229,9 @@ func TestInitProducerIDConnectionRefused(t *testing.T) {
11611229
func TestMetricsCleanup(t *testing.T) {
11621230
seedBroker := NewMockBroker(t, 1)
11631231
defer seedBroker.Close()
1164-
seedBroker.Returns(new(MetadataResponse))
1232+
metadataResponse := new(MetadataResponse)
1233+
metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
1234+
seedBroker.Returns(metadataResponse)
11651235

11661236
config := NewTestConfig()
11671237
metrics.GetOrRegisterMeter("a", config.MetricRegistry)

Diff for: client_tls_test.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,9 @@ func doListenerTLSTest(t *testing.T, expectSuccess bool, serverConfig, clientCon
197197
seedBroker := NewMockBrokerListener(childT, 1, seedListener)
198198
defer seedBroker.Close()
199199

200-
seedBroker.Returns(new(MetadataResponse))
200+
metadataResponse := new(MetadataResponse)
201+
metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
202+
seedBroker.Returns(metadataResponse)
201203

202204
config := NewTestConfig()
203205
config.Net.TLS.Enable = true

Diff for: offset_manager_test.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,9 @@ func initPartitionOffsetManager(t *testing.T, om OffsetManager,
7878

7979
func TestNewOffsetManager(t *testing.T) {
8080
seedBroker := NewMockBroker(t, 1)
81-
seedBroker.Returns(new(MetadataResponse))
81+
metadataResponse := new(MetadataResponse)
82+
metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
83+
seedBroker.Returns(metadataResponse)
8284
defer seedBroker.Close()
8385

8486
testClient, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig())

Diff for: sync_producer_test.go

+1
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,7 @@ func TestSyncProducerToNonExistingTopic(t *testing.T) {
271271
}
272272

273273
metadataResponse = new(MetadataResponse)
274+
metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
274275
metadataResponse.AddTopic("unknown", ErrUnknownTopicOrPartition)
275276
broker.Returns(metadataResponse)
276277

0 commit comments

Comments
 (0)