Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 2 additions & 8 deletions admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,11 +700,8 @@ func (ca *clusterAdmin) DescribeConfig(resource ConfigResource) ([]ConfigEntry,

for _, rspResource := range rsp.Resources {
if rspResource.Name == resource.Name {
if rspResource.ErrorMsg != "" {
return nil, errors.New(rspResource.ErrorMsg)
}
if rspResource.ErrorCode != 0 {
return nil, KError(rspResource.ErrorCode)
return nil, &DescribeConfigError{Err: KError(rspResource.ErrorCode), ErrMsg: rspResource.ErrorMsg}
}
for _, cfgEntry := range rspResource.Configs {
entries = append(entries, *cfgEntry)
Expand Down Expand Up @@ -758,11 +755,8 @@ func (ca *clusterAdmin) AlterConfig(resourceType ConfigResourceType, name string

for _, rspResource := range rsp.Resources {
if rspResource.Name == name {
if rspResource.ErrorMsg != "" {
return errors.New(rspResource.ErrorMsg)
}
if rspResource.ErrorCode != 0 {
return KError(rspResource.ErrorCode)
return &AlterConfigError{Err: KError(rspResource.ErrorCode), ErrMsg: rspResource.ErrorMsg}
}
}
}
Expand Down
18 changes: 17 additions & 1 deletion alter_configs_response.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package sarama

import "time"
import (
"fmt"
"time"
)

// AlterConfigsResponse is a response type for alter config
type AlterConfigsResponse struct {
Expand All @@ -9,6 +12,19 @@ type AlterConfigsResponse struct {
Resources []*AlterConfigsResourceResponse
}

type AlterConfigError struct {
Err KError
ErrMsg string
}

func (c *AlterConfigError) Error() string {
text := c.Err.Error()
if c.ErrMsg != "" {
text = fmt.Sprintf("%s - %s", text, c.ErrMsg)
}
return text
}

// AlterConfigsResourceResponse is a response type for alter config resource
type AlterConfigsResourceResponse struct {
ErrorCode int16
Expand Down
4 changes: 2 additions & 2 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1350,7 +1350,7 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo
bp.parent.returnSuccesses(pSet.msgs)
// Retriable errors
case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend, ErrKafkaStorageError:
if bp.parent.conf.Producer.Retry.Max <= 0 {
bp.parent.abandonBrokerConnection(bp.broker)
bp.parent.returnErrors(pSet.msgs, block.Err)
Expand Down Expand Up @@ -1389,7 +1389,7 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo

switch block.Err {
case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend, ErrKafkaStorageError:
Logger.Printf("producer/broker/%d state change to [retrying] on %s/%d because %v\n",
bp.broker.ID(), topic, partition, block.Err)
if bp.currentRetries[topic] == nil {
Expand Down
7 changes: 7 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -721,9 +721,16 @@ func (client *client) randomizeSeedBrokers(addrs []string) {
}

func (client *client) updateBroker(brokers []*Broker) {
if client.brokers == nil {
return
}

currentBroker := make(map[int32]*Broker, len(brokers))

for _, broker := range brokers {
if broker == nil {
continue
}
currentBroker[broker.ID()] = broker
if client.brokers[broker.ID()] == nil { // add new broker
client.brokers[broker.ID()] = broker
Expand Down
69 changes: 69 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"time"

"github.com/rcrowley/go-metrics"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func safeClose(t testing.TB, c io.Closer) {
Expand Down Expand Up @@ -1249,3 +1251,70 @@ func TestMetricsCleanup(t *testing.T) {
t.Errorf("excepted 1 metric, found: %v", all)
}
}

func TestUpdateBroker(t *testing.T) {
t.Run("closed client doesn't panic", func(t *testing.T) {
c := &client{}
fn := func() {
c.updateBroker(nil)
c.updateBroker([]*Broker{
{
id: 0,
addr: "127.0.0.1:9092",
},
})
}
require.NotPanics(t, fn)
})

t.Run("open client adds new broker entries", func(t *testing.T) {
c := &client{
brokers: make(map[int32]*Broker),
}
fn := func() {
c.updateBroker([]*Broker{
{
id: 0,
addr: "127.0.0.1:9092",
},
})
}
require.NotPanics(t, fn)
require.Len(t, c.brokers, 1)
assert.Equal(t, 0, int(c.brokers[0].ID()))
assert.Equal(t, "127.0.0.1:9092", c.brokers[0].Addr())
})

t.Run("open client adds, updates and removes broker entries", func(t *testing.T) {
c := &client{
brokers: map[int32]*Broker{
0: {
id: 0,
addr: "127.0.0.1:9092",
},
1: {
id: 1,
addr: "127.0.0.1:9093",
},
},
}
fn := func() {
c.updateBroker([]*Broker{
{
id: 1,
addr: "127.0.0.1:19093", // new addr for existing broker
},
{
id: 2,
addr: "127.0.0.1:19094",
},
})
}
require.NotPanics(t, fn)
require.Len(t, c.brokers, 2)
assert.Equal(t, 1, int(c.brokers[1].ID()))
assert.Equal(t, "127.0.0.1:19093", c.brokers[1].Addr())
assert.Equal(t, 2, int(c.brokers[2].ID()))
assert.Equal(t, "127.0.0.1:19094", c.brokers[2].Addr())
})
}
13 changes: 13 additions & 0 deletions describe_configs_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,19 @@ const (
SourceDefault
)

type DescribeConfigError struct {
Err KError
ErrMsg string
}

func (c *DescribeConfigError) Error() string {
text := c.Err.Error()
if c.ErrMsg != "" {
text = fmt.Sprintf("%s - %s", text, c.ErrMsg)
}
return text
}

type DescribeConfigsResponse struct {
Version int16
ThrottleTime time.Duration
Expand Down
Loading