Skip to content

Commit

Permalink
make InitProducerID return ErrOutOfBrokers with the the per-broker er…
Browse files Browse the repository at this point in the history
…rors wrapped

Signed-off-by: kwall <[email protected]>
  • Loading branch information
k-wall committed Feb 18, 2022
1 parent 95de5d8 commit cbc3918
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 3 deletions.
7 changes: 4 additions & 3 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,23 +212,24 @@ func (client *client) Broker(brokerID int32) (*Broker, error) {
}

func (client *client) InitProducerID() (*InitProducerIDResponse, error) {
err := ErrOutOfBrokers
brokerErrors := make([]error, 0)
for broker := client.any(); broker != nil; broker = client.any() {
var response *InitProducerIDResponse
req := &InitProducerIDRequest{}

response, err = broker.InitProducerID(req)
response, err := broker.InitProducerID(req)
if err == nil {
return response, nil
} else {
// some error, remove that broker and try again
Logger.Printf("Client got error from broker %d when issuing InitProducerID : %v\n", broker.ID(), err)
_ = broker.Close()
brokerErrors = append(brokerErrors, err)
client.deregisterBroker(broker)
}
}

return nil, err
return nil, Wrap(ErrOutOfBrokers, brokerErrors...)
}

func (client *client) Close() error {
Expand Down
31 changes: 31 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1038,3 +1038,34 @@ func TestClientCoordinatorConnectionRefused(t *testing.T) {

safeClose(t, client)
}

func TestInitProducerIDConnectionRefused(t *testing.T) {
t.Parallel()
seedBroker := NewMockBroker(t, 1)
seedBroker.Returns(&MetadataResponse{Version: 1})

config := NewTestConfig()
config.Producer.Idempotent = true
config.Version = V0_11_0_0
config.Producer.RequiredAcks = WaitForAll
config.Net.MaxOpenRequests = 1

client, err := NewClient([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

seedBroker.Close()

_, err = client.InitProducerID()

if !errors.Is(err, ErrOutOfBrokers) {
t.Fatalf("unexpected error: %v", err)
}

if !errors.Is(err, io.EOF) {
t.Fatalf("unexpected error: %v", err)
}

safeClose(t, client)
}

0 comments on commit cbc3918

Please sign in to comment.