From 89b5ab87c8929603bb012ebd4c10ed980a8a6514 Mon Sep 17 00:00:00 2001 From: Adrian Preston Date: Wed, 1 Nov 2023 10:45:36 +0000 Subject: [PATCH 1/3] Fix data race on Broker.done channel (#2698) The underlying case was not waiting for the goroutine running the `responseReceiver()` method to fully complete if SASL authentication failed. This created a window where a further call to `Broker.Open()` could overwrite the `Broker.done` channel value while the goroutine still running `responseReceiver()` was trying to close the same channel. Fixes: #2382 Signed-off-by: Adrian Preston --- broker.go | 1 + 1 file changed, 1 insertion(+) diff --git a/broker.go b/broker.go index 9ed62eb86..555c02cb0 100644 --- a/broker.go +++ b/broker.go @@ -261,6 +261,7 @@ func (b *Broker) Open(conf *Config) error { b.connErr = b.authenticateViaSASLv1() if b.connErr != nil { close(b.responses) + <-b.done err = b.conn.Close() if err == nil { DebugLogger.Printf("Closed connection to broker %s\n", b.addr) From c880dcebe9d6a7c353c6838405650d15fdc5d74a Mon Sep 17 00:00:00 2001 From: bvalente Date: Mon, 15 Sep 2025 12:58:40 +0100 Subject: [PATCH 2/3] fix: add read deadline to tls write (#3283) Related to: - https://github.com/golang/go/issues/13828 - https://github.com/IBM/sarama/issues/1722 We're using https://github.com/Mongey/terraform-provider-kafka to manage Kafka Topics with Terraform. Recently we've changed from Plaintext communications to AWS IAM Authentication. When doing so, our provider sometimes would hang indefinitely on some plans. We pinned this to the `kafka.t3.small` cluster tiers, as these have several limitations, including a maximum of 4 TCP connections per second. While debugging the provider, we understood that the Call Stack was stuck on writing to the cluster, more specifically right on the first communication that it was trying to do with the clusters. Reading through the code, we found a very interesting comment for the Write function of the TLS package. https://github.com/golang/go/blob/go1.23.0/src/crypto/tls/conn.go#L1192-L1195 ``` // As Write calls [Conn.Handshake], in order to prevent indefinite blocking a deadline // must be set for both [Conn.Read] and Write before Write is called when the handshake // has not yet completed. See [Conn.SetDeadline], [Conn.SetReadDeadline], and // [Conn.SetWriteDeadline]. ``` Based on this, TLS requires both Write and Read Deadlines to be set because the Write function may do a handshake on the fist communication, and the handshake both Writes and Reads. I believe that in our case, since we are working with brokers that don't have a very reliable network, sometimes the handshake would not progress on the server side, and we would indefinitely wait for a Read that would never come. After implementing this change in our local workstation, instead of experiencing indefinite hanging, the program would finally report some time of error: ``` Error: kafka: client has run out of available brokers to talk to: read tcp 10.xxx.xxx.xxx:59582->10.xxx.xxx.xxx:9098: i/o timeout ``` Signed-off-by: Bernardo Valente --- broker.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/broker.go b/broker.go index 555c02cb0..775ef0798 100644 --- a/broker.go +++ b/broker.go @@ -939,12 +939,21 @@ func (b *Broker) readFull(buf []byte) (n int, err error) { return io.ReadFull(b.conn, buf) } -// write ensures the conn WriteDeadline has been setup before making a +// write ensures the conn Deadline has been setup before making a // call to conn.Write func (b *Broker) write(buf []byte) (n int, err error) { - if err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout)); err != nil { + now := time.Now() + if err := b.conn.SetWriteDeadline(now.Add(b.conf.Net.WriteTimeout)); err != nil { return 0, err } + // TLS connections require both read and write deadlines to be set + // to avoid handshake indefinite blocking + // see https://github.com/golang/go/blob/go1.23.0/src/crypto/tls/conn.go#L1192-L1195 + if b.conf.Net.TLS.Enable { + if err := b.conn.SetReadDeadline(now.Add(b.conf.Net.ReadTimeout)); err != nil { + return 0, err + } + } return b.conn.Write(buf) } From 08efa56a30204d1ca3c26414da71a0f747e52571 Mon Sep 17 00:00:00 2001 From: HaoSunUber <86338940+HaoSunUber@users.noreply.github.com> Date: Fri, 13 Oct 2023 13:54:35 -0700 Subject: [PATCH 3/3] fix(client): ignore empty Metadata responses when refreshing (#2672) We should skip the metadata refresh if the startup phase broker returns empty brokers in metadata response. The Java client skips the empty response to update the metadata cache (https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L1149) and we should make a feature parity in Sarama too Fixes #2664 Signed-off-by: Hao Sun --- client.go | 7 ++++ client_test.go | 86 ++++++++++++++++++++++++++++++++++++++---- client_tls_test.go | 4 +- offset_manager_test.go | 4 +- sync_producer_test.go | 1 + 5 files changed, 92 insertions(+), 10 deletions(-) diff --git a/client.go b/client.go index 5e665eaef..c6364eead 100644 --- a/client.go +++ b/client.go @@ -1035,6 +1035,13 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int, var kerror KError var packetEncodingError PacketEncodingError if err == nil { + // When talking to the startup phase of a broker, it is possible to receive an empty metadata set. We should remove that broker and try next broker (https://issues.apache.org/jira/browse/KAFKA-7924). + if len(response.Brokers) == 0 { + Logger.Println("client/metadata receiving empty brokers from the metadata response when requesting the broker #%d at %s", broker.ID(), broker.addr) + _ = broker.Close() + client.deregisterBroker(broker) + continue + } allKnownMetaData := len(topics) == 0 // valid response, use it shouldRetry, err := client.updateMetadata(response, allKnownMetaData) diff --git a/client_test.go b/client_test.go index d65fdf4fb..5b485ae4d 100644 --- a/client_test.go +++ b/client_test.go @@ -23,7 +23,9 @@ func safeClose(t testing.TB, c io.Closer) { func TestSimpleClient(t *testing.T) { seedBroker := NewMockBroker(t, 1) - seedBroker.Returns(new(MetadataResponse)) + metadataResponse := new(MetadataResponse) + metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) + seedBroker.Returns(metadataResponse) client, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig()) if err != nil { @@ -92,6 +94,7 @@ func TestClientDoesntCachePartitionsForTopicsWithErrors(t *testing.T) { } metadataResponse = new(MetadataResponse) + metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) metadataResponse.AddTopic("unknown", ErrUnknownTopicOrPartition) seedBroker.Returns(metadataResponse) @@ -111,6 +114,7 @@ func TestClientDoesntCachePartitionsForTopicsWithErrors(t *testing.T) { } metadataResponse = new(MetadataResponse) + metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) metadataResponse.AddTopic("unknown", ErrUnknownTopicOrPartition) seedBroker.Returns(metadataResponse) @@ -358,6 +362,7 @@ func TestClientReceivingUnknownTopicWithBackoffFunc(t *testing.T) { seedBroker := NewMockBroker(t, 1) metadataResponse1 := new(MetadataResponse) + metadataResponse1.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) seedBroker.Returns(metadataResponse1) retryCount := int32(0) @@ -375,6 +380,7 @@ func TestClientReceivingUnknownTopicWithBackoffFunc(t *testing.T) { metadataUnknownTopic := new(MetadataResponse) metadataUnknownTopic.AddTopic("new_topic", ErrUnknownTopicOrPartition) + metadataUnknownTopic.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) seedBroker.Returns(metadataUnknownTopic) seedBroker.Returns(metadataUnknownTopic) @@ -395,6 +401,7 @@ func TestClientReceivingUnknownTopic(t *testing.T) { seedBroker := NewMockBroker(t, 1) metadataResponse1 := new(MetadataResponse) + metadataResponse1.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) seedBroker.Returns(metadataResponse1) config := NewTestConfig() @@ -406,6 +413,7 @@ func TestClientReceivingUnknownTopic(t *testing.T) { } metadataUnknownTopic := new(MetadataResponse) + metadataUnknownTopic.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) metadataUnknownTopic.AddTopic("new_topic", ErrUnknownTopicOrPartition) seedBroker.Returns(metadataUnknownTopic) seedBroker.Returns(metadataUnknownTopic) @@ -481,6 +489,53 @@ func TestClientReceivingPartialMetadata(t *testing.T) { leader.Close() } +func TestClientRefreshBehaviourWhenEmptyMetadataResponse(t *testing.T) { + seedBroker := NewMockBroker(t, 1) + broker := NewMockBroker(t, 2) + + metadataResponse1 := new(MetadataResponse) + metadataResponse1.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) + seedBroker.Returns(metadataResponse1) + + c, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig()) + if err != nil { + t.Fatal(err) + } + client := c.(*client) + if len(client.seedBrokers) != 1 { + t.Error("incorrect number of live seeds") + } + if len(client.deadSeeds) != 0 { + t.Error("incorrect number of dead seeds") + } + if len(client.brokers) != 1 { + t.Error("incorrect number of brokers") + } + + // Empty metadata response + seedBroker.Returns(new(MetadataResponse)) + metadataResponse2 := new(MetadataResponse) + metadataResponse2.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) + metadataResponse2.AddBroker(broker.Addr(), broker.BrokerID()) + seedBroker.Returns(metadataResponse2) + err = c.RefreshMetadata() + if err != nil { + t.Fatal(err) + } + if len(client.seedBrokers) != 1 { + t.Error("incorrect number of live seeds") + } + if len(client.deadSeeds) != 0 { + t.Error("incorrect number of dead seeds") + } + if len(client.brokers) != 2 { + t.Error("incorrect number of brokers") + } + broker.Close() + seedBroker.Close() + safeClose(t, client) +} + func TestClientRefreshBehaviour(t *testing.T) { seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 5) @@ -633,8 +688,9 @@ func TestClientGetBroker(t *testing.T) { func TestClientResurrectDeadSeeds(t *testing.T) { initialSeed := NewMockBroker(t, 0) - emptyMetadata := new(MetadataResponse) - initialSeed.Returns(emptyMetadata) + metadataResponse := new(MetadataResponse) + metadataResponse.AddBroker(initialSeed.Addr(), initialSeed.BrokerID()) + initialSeed.Returns(metadataResponse) conf := NewTestConfig() conf.Metadata.Retry.Backoff = 0 @@ -643,7 +699,6 @@ func TestClientResurrectDeadSeeds(t *testing.T) { if err != nil { t.Fatal(err) } - initialSeed.Close() client := c.(*client) @@ -658,6 +713,7 @@ func TestClientResurrectDeadSeeds(t *testing.T) { safeClose(t, client.seedBrokers[0]) client.seedBrokers = []*Broker{NewBroker(addr1), NewBroker(addr2), NewBroker(addr3)} client.deadSeeds = []*Broker{} + client.brokers = map[int32]*Broker{} wg := sync.WaitGroup{} wg.Add(1) @@ -676,7 +732,9 @@ func TestClientResurrectDeadSeeds(t *testing.T) { seed3.Close() seed1.Close() - seed2.Returns(emptyMetadata) + metadataResponse2 := new(MetadataResponse) + metadataResponse2.AddBroker(seed2.Addr(), seed2.BrokerID()) + seed2.Returns(metadataResponse2) wg.Wait() @@ -766,6 +824,7 @@ func TestClientMetadataTimeout(t *testing.T) { // Use a responsive broker to create a working client initialSeed := NewMockBroker(t, 0) emptyMetadata := new(MetadataResponse) + emptyMetadata.AddBroker(initialSeed.Addr(), initialSeed.BrokerID()) initialSeed.Returns(emptyMetadata) conf := NewTestConfig() @@ -995,6 +1054,7 @@ func TestClientCoordinatorWithoutConsumerOffsetsTopic(t *testing.T) { coordinator := NewMockBroker(t, 2) metadataResponse1 := new(MetadataResponse) + metadataResponse1.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) seedBroker.Returns(metadataResponse1) config := NewTestConfig() @@ -1010,11 +1070,13 @@ func TestClientCoordinatorWithoutConsumerOffsetsTopic(t *testing.T) { seedBroker.Returns(coordinatorResponse1) metadataResponse2 := new(MetadataResponse) + metadataResponse2.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) metadataResponse2.AddTopic("__consumer_offsets", ErrUnknownTopicOrPartition) seedBroker.Returns(metadataResponse2) replicas := []int32{coordinator.BrokerID()} metadataResponse3 := new(MetadataResponse) + metadataResponse3.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) metadataResponse3.AddTopicPartition("__consumer_offsets", 0, replicas[0], replicas, replicas, []int32{}, ErrNoError) seedBroker.Returns(metadataResponse3) @@ -1048,6 +1110,7 @@ func TestClientAutorefreshShutdownRace(t *testing.T) { defer seedBroker.Close() metadataResponse := new(MetadataResponse) + metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) seedBroker.Returns(metadataResponse) conf := NewTestConfig() @@ -1104,7 +1167,9 @@ func TestClientConnectionRefused(t *testing.T) { func TestClientCoordinatorConnectionRefused(t *testing.T) { t.Parallel() seedBroker := NewMockBroker(t, 1) - seedBroker.Returns(new(MetadataResponse)) + metadataResponse := new(MetadataResponse) + metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) + seedBroker.Returns(metadataResponse) client, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig()) if err != nil { @@ -1129,7 +1194,10 @@ func TestClientCoordinatorConnectionRefused(t *testing.T) { func TestInitProducerIDConnectionRefused(t *testing.T) { t.Parallel() seedBroker := NewMockBroker(t, 1) - seedBroker.Returns(&MetadataResponse{Version: 4}) + metadataResponse := new(MetadataResponse) + metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) + metadataResponse.Version = 4 + seedBroker.Returns(metadataResponse) config := NewTestConfig() config.Producer.Idempotent = true @@ -1160,7 +1228,9 @@ func TestInitProducerIDConnectionRefused(t *testing.T) { func TestMetricsCleanup(t *testing.T) { seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() - seedBroker.Returns(new(MetadataResponse)) + metadataResponse := new(MetadataResponse) + metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) + seedBroker.Returns(metadataResponse) config := NewTestConfig() metrics.GetOrRegisterMeter("a", config.MetricRegistry) diff --git a/client_tls_test.go b/client_tls_test.go index 5ec0bfb75..dc45376b1 100644 --- a/client_tls_test.go +++ b/client_tls_test.go @@ -196,7 +196,9 @@ func doListenerTLSTest(t *testing.T, expectSuccess bool, serverConfig, clientCon seedBroker := NewMockBrokerListener(childT, 1, seedListener) defer seedBroker.Close() - seedBroker.Returns(new(MetadataResponse)) + metadataResponse := new(MetadataResponse) + metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) + seedBroker.Returns(metadataResponse) config := NewTestConfig() config.Net.TLS.Enable = true diff --git a/offset_manager_test.go b/offset_manager_test.go index da38f0372..7102c17f3 100644 --- a/offset_manager_test.go +++ b/offset_manager_test.go @@ -78,7 +78,9 @@ func initPartitionOffsetManager(t *testing.T, om OffsetManager, func TestNewOffsetManager(t *testing.T) { seedBroker := NewMockBroker(t, 1) - seedBroker.Returns(new(MetadataResponse)) + metadataResponse := new(MetadataResponse) + metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) + seedBroker.Returns(metadataResponse) defer seedBroker.Close() testClient, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig()) diff --git a/sync_producer_test.go b/sync_producer_test.go index 8d366b011..776ae5f69 100644 --- a/sync_producer_test.go +++ b/sync_producer_test.go @@ -271,6 +271,7 @@ func TestSyncProducerToNonExistingTopic(t *testing.T) { } metadataResponse = new(MetadataResponse) + metadataResponse.AddBroker(broker.Addr(), broker.BrokerID()) metadataResponse.AddTopic("unknown", ErrUnknownTopicOrPartition) broker.Returns(metadataResponse)