From b678494741ce5622ef4bf67b28fccd61b136dc9c Mon Sep 17 00:00:00 2001 From: Dominic Evans <8060970+dnwe@users.noreply.github.com> Date: Thu, 27 Nov 2025 16:12:01 +0000 Subject: [PATCH 1/3] fix(client): add nilguards to updateBroker (#3393) This _should_ be a rare edge case as the updateBroker func is only called from updateMetadata which does already do an up-front `client.Close()` check under read-lock before then acquiring the write lock. However, there's potentially a small window of opportunity that if client.Close() was called whilst metadata refresh was in-flight and for whatever reason the updateMetadata goroutine gets pre-empted in-between the readlock release and the write lock acquire then the client could have been closed and so `client.brokers` will be nil. I wouldn't have expected this to ever happen, but it was reported by a user in an older Sarama version in #3391 and there's no harm in adding the nilguard just in case. Signed-off-by: Dominic Evans --- client.go | 7 +++++ client_test.go | 69 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 76 insertions(+) diff --git a/client.go b/client.go index c6364eead..2f00a8b79 100644 --- a/client.go +++ b/client.go @@ -721,9 +721,16 @@ func (client *client) randomizeSeedBrokers(addrs []string) { } func (client *client) updateBroker(brokers []*Broker) { + if client.brokers == nil { + return + } + currentBroker := make(map[int32]*Broker, len(brokers)) for _, broker := range brokers { + if broker == nil { + continue + } currentBroker[broker.ID()] = broker if client.brokers[broker.ID()] == nil { // add new broker client.brokers[broker.ID()] = broker diff --git a/client_test.go b/client_test.go index 5b485ae4d..8081217b8 100644 --- a/client_test.go +++ b/client_test.go @@ -10,6 +10,8 @@ import ( "time" "github.com/rcrowley/go-metrics" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func safeClose(t testing.TB, c io.Closer) { @@ -1249,3 +1251,70 @@ func TestMetricsCleanup(t *testing.T) { t.Errorf("excepted 1 metric, found: %v", all) } } + +func TestUpdateBroker(t *testing.T) { + t.Run("closed client doesn't panic", func(t *testing.T) { + c := &client{} + fn := func() { + c.updateBroker(nil) + c.updateBroker([]*Broker{ + { + id: 0, + addr: "127.0.0.1:9092", + }, + }) + } + require.NotPanics(t, fn) + }) + + t.Run("open client adds new broker entries", func(t *testing.T) { + c := &client{ + brokers: make(map[int32]*Broker), + } + fn := func() { + c.updateBroker([]*Broker{ + { + id: 0, + addr: "127.0.0.1:9092", + }, + }) + } + require.NotPanics(t, fn) + require.Len(t, c.brokers, 1) + assert.Equal(t, 0, int(c.brokers[0].ID())) + assert.Equal(t, "127.0.0.1:9092", c.brokers[0].Addr()) + }) + + t.Run("open client adds, updates and removes broker entries", func(t *testing.T) { + c := &client{ + brokers: map[int32]*Broker{ + 0: { + id: 0, + addr: "127.0.0.1:9092", + }, + 1: { + id: 1, + addr: "127.0.0.1:9093", + }, + }, + } + fn := func() { + c.updateBroker([]*Broker{ + { + id: 1, + addr: "127.0.0.1:19093", // new addr for existing broker + }, + { + id: 2, + addr: "127.0.0.1:19094", + }, + }) + } + require.NotPanics(t, fn) + require.Len(t, c.brokers, 2) + assert.Equal(t, 1, int(c.brokers[1].ID())) + assert.Equal(t, "127.0.0.1:19093", c.brokers[1].Addr()) + assert.Equal(t, 2, int(c.brokers[2].ID())) + assert.Equal(t, "127.0.0.1:19094", c.brokers[2].Addr()) + }) +} From c49f0fb8bf6e83cba84e7ba7c1fde58d9c24f35d Mon Sep 17 00:00:00 2001 From: zhuliquan Date: Thu, 1 Feb 2024 05:08:20 +0800 Subject: [PATCH 2/3] feat: return KError instead of errors in AlterConfigs and DescribeConfigs (#2472) Signed-off-by: zhuliquan Co-authored-by: zhuliquan --- admin.go | 10 ++-------- alter_configs_response.go | 18 +++++++++++++++++- describe_configs_response.go | 13 +++++++++++++ 3 files changed, 32 insertions(+), 9 deletions(-) diff --git a/admin.go b/admin.go index 008233f0c..7c07c2346 100644 --- a/admin.go +++ b/admin.go @@ -700,11 +700,8 @@ func (ca *clusterAdmin) DescribeConfig(resource ConfigResource) ([]ConfigEntry, for _, rspResource := range rsp.Resources { if rspResource.Name == resource.Name { - if rspResource.ErrorMsg != "" { - return nil, errors.New(rspResource.ErrorMsg) - } if rspResource.ErrorCode != 0 { - return nil, KError(rspResource.ErrorCode) + return nil, &DescribeConfigError{Err: KError(rspResource.ErrorCode), ErrMsg: rspResource.ErrorMsg} } for _, cfgEntry := range rspResource.Configs { entries = append(entries, *cfgEntry) @@ -758,11 +755,8 @@ func (ca *clusterAdmin) AlterConfig(resourceType ConfigResourceType, name string for _, rspResource := range rsp.Resources { if rspResource.Name == name { - if rspResource.ErrorMsg != "" { - return errors.New(rspResource.ErrorMsg) - } if rspResource.ErrorCode != 0 { - return KError(rspResource.ErrorCode) + return &AlterConfigError{Err: KError(rspResource.ErrorCode), ErrMsg: rspResource.ErrorMsg} } } } diff --git a/alter_configs_response.go b/alter_configs_response.go index 658f32e9a..d8b70e371 100644 --- a/alter_configs_response.go +++ b/alter_configs_response.go @@ -1,6 +1,9 @@ package sarama -import "time" +import ( + "fmt" + "time" +) // AlterConfigsResponse is a response type for alter config type AlterConfigsResponse struct { @@ -9,6 +12,19 @@ type AlterConfigsResponse struct { Resources []*AlterConfigsResourceResponse } +type AlterConfigError struct { + Err KError + ErrMsg string +} + +func (c *AlterConfigError) Error() string { + text := c.Err.Error() + if c.ErrMsg != "" { + text = fmt.Sprintf("%s - %s", text, c.ErrMsg) + } + return text +} + // AlterConfigsResourceResponse is a response type for alter config resource type AlterConfigsResourceResponse struct { ErrorCode int16 diff --git a/describe_configs_response.go b/describe_configs_response.go index 8aed5de85..386a56885 100644 --- a/describe_configs_response.go +++ b/describe_configs_response.go @@ -34,6 +34,19 @@ const ( SourceDefault ) +type DescribeConfigError struct { + Err KError + ErrMsg string +} + +func (c *DescribeConfigError) Error() string { + text := c.Err.Error() + if c.ErrMsg != "" { + text = fmt.Sprintf("%s - %s", text, c.ErrMsg) + } + return text +} + type DescribeConfigsResponse struct { Version int16 ThrottleTime time.Duration From 3e8f7a1123b67ae1bc8bb060057e1e5d439958ba Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Wed, 7 Aug 2024 13:26:41 -0500 Subject: [PATCH 3/3] fix(producer): treat ErrKafkaStorageError as retriable (#2939) This is retriable according to the spec: https://kafka.apache.org/protocol.html Signed-off-by: Richard Artoul --- async_producer.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/async_producer.go b/async_producer.go index 97f2296da..24f863b88 100644 --- a/async_producer.go +++ b/async_producer.go @@ -1350,7 +1350,7 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo bp.parent.returnSuccesses(pSet.msgs) // Retriable errors case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition, - ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend: + ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend, ErrKafkaStorageError: if bp.parent.conf.Producer.Retry.Max <= 0 { bp.parent.abandonBrokerConnection(bp.broker) bp.parent.returnErrors(pSet.msgs, block.Err) @@ -1389,7 +1389,7 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo switch block.Err { case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition, - ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend: + ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend, ErrKafkaStorageError: Logger.Printf("producer/broker/%d state change to [retrying] on %s/%d because %v\n", bp.broker.ID(), topic, partition, block.Err) if bp.currentRetries[topic] == nil {