Skip to content

Commit

Permalink
Fix IBM#2128 - make ErrOutOfBrokers wrap the underlying net exception…
Browse files Browse the repository at this point in the history
…s that prevented connection
  • Loading branch information
k-wall committed Feb 7, 2022
1 parent 69e75b7 commit 2d2a020
Show file tree
Hide file tree
Showing 4 changed files with 193 additions and 4 deletions.
11 changes: 8 additions & 3 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -878,6 +878,7 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int,
}

broker := client.any()
brokerErrors := make([]error, 0)
for ; broker != nil && !pastDeadline(0); broker = client.any() {
allowAutoTopicCreation := client.conf.Metadata.AllowAutoTopicCreation
if len(topics) > 0 {
Expand Down Expand Up @@ -928,19 +929,21 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int,
default:
// some other error, remove that broker and try again
Logger.Printf("client/metadata got error from broker %d while fetching metadata: %v\n", broker.ID(), err)
brokerErrors = append(brokerErrors, err)
_ = broker.Close()
client.deregisterBroker(broker)
}
}

error := Wrap(ErrOutOfBrokers, brokerErrors...)
if broker != nil {
Logger.Printf("client/metadata not fetching metadata from broker %s as we would go past the metadata timeout\n", broker.addr)
return retry(ErrOutOfBrokers)
return retry(error)
}

Logger.Println("client/metadata no available broker to send metadata request to")
client.resurrectDeadBrokers()
return retry(ErrOutOfBrokers)
return retry(error)
}

// if no fatal error, returns a list of topics that need retrying due to ErrLeaderNotAvailable
Expand Down Expand Up @@ -1047,6 +1050,7 @@ func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemainin
return nil, err
}

brokerErrors := make([]error, 0)
for broker := client.any(); broker != nil; broker = client.any() {
DebugLogger.Printf("client/coordinator requesting coordinator for consumergroup %s from %s\n", consumerGroup, broker.Addr())

Expand All @@ -1063,6 +1067,7 @@ func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemainin
return nil, err
default:
_ = broker.Close()
brokerErrors = append(brokerErrors, err)
client.deregisterBroker(broker)
continue
}
Expand Down Expand Up @@ -1096,7 +1101,7 @@ func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemainin

Logger.Println("client/coordinator no available broker to send consumer metadata request to")
client.resurrectDeadBrokers()
return retry(ErrOutOfBrokers)
return retry(Wrap(ErrOutOfBrokers, brokerErrors...))
}

// nopCloserClient embeds an existing Client, but disables
Expand Down
15 changes: 15 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"sync"
"sync/atomic"
"syscall"
"testing"
"time"
)
Expand Down Expand Up @@ -959,3 +960,17 @@ func TestClientAutorefreshShutdownRace(t *testing.T) {
// give the update time to happen so we get a panic if it's still running (which it shouldn't)
time.Sleep(10 * time.Millisecond)
}

func TestConnectionRefused(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
seedBroker.Close()

_, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig())
if !errors.Is(err, ErrOutOfBrokers) {
t.Fatalf("unexpected error: %v", err)
}

if !errors.Is(err, syscall.ECONNREFUSED) {
t.Fatalf("unexpected error: %v", err)
}
}
72 changes: 71 additions & 1 deletion errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,37 @@ var ErrNoTopicsToUpdateMetadata = errors.New("kafka: no specific topics to updat
// ErrUnknownScramMechanism is returned when user tries to AlterUserScramCredentials with unknown SCRAM mechanism
var ErrUnknownScramMechanism = errors.New("kafka: unknown SCRAM mechanism provided")

type sentinelError struct {
sentinel error
wrapped error
}

func (err sentinelError) Error() string {
if err.wrapped != nil {
return fmt.Sprintf("%s: %v", err.sentinel, err.wrapped)
} else {
return fmt.Sprintf("%s", err.sentinel)
}
}

func (err sentinelError) Is(target error) bool {
return errors.Is(err.sentinel, target) || errors.Is(err.wrapped, target)
}

func (err sentinelError) Unwrap() error {
return err.wrapped
}

func Wrap(sentinel error, wrapped ...error) sentinelError {
if len(wrapped) == 0 {
return sentinelError{sentinel, nil}
} else if len(wrapped) == 1 {
return sentinelError{sentinel, wrapped[0]}
} else {
return sentinelError{sentinel, NewMultiError(wrapped...)}
}
}

// PacketEncodingError is returned from a failure while encoding a Kafka packet. This can happen, for example,
// if you try to encode a string over 2^15 characters in length, since Kafka's encoding rules do not permit that.
type PacketEncodingError struct {
Expand Down Expand Up @@ -92,14 +123,49 @@ type MultiError struct {
Errors *[]error
}

func (mErr MultiError) Is(target error) bool {
if mErr.Errors != nil {
for _,e := range *mErr.Errors {
if errors.Is(e, target) {
return true
}
}
}
return false
}

func (mErr MultiError) As(target interface{}) bool {
if mErr.Errors != nil {
for _,e := range *mErr.Errors {
if errors.As(e, target) {
return true
}
}
}
return false
}

func (mErr MultiError) Error() string {
errString := ""
for _, err := range *mErr.Errors {
errString += err.Error() + ","
if len(errString) > 0 {
errString += ","
}
errString += err.Error()
}
return errString
}

// The approach taken by https://github.com/hashicorp/go-multierror/blob/master/multierror.go#L85
// might be a considered (or maybe even use the dependency)?
func (mErr MultiError) Unwrap() error {
if mErr.Errors != nil && len(*mErr.Errors) > 0 {
return (*mErr.Errors)[0]
} else {
return nil
}
}

func (mErr MultiError) PrettyError() string {
errString := ""
for _, err := range *mErr.Errors {
Expand All @@ -108,6 +174,10 @@ func (mErr MultiError) PrettyError() string {
return errString
}

func NewMultiError(errs ...error) MultiError {
return MultiError{&errs}
}

// ErrDeleteRecords is the type of error returned when fail to delete the required records
type ErrDeleteRecords struct {
MultiError
Expand Down
99 changes: 99 additions & 0 deletions errors_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package sarama

import (
"errors"
"fmt"
"net"
"testing"
)

func TestMultiError(t *testing.T) {

myNetError := &net.OpError{Op: "mock", Err: errors.New("op error")}
myAddrError := &net.AddrError{Err: "mock addr error"}

multiError := NewMultiError(myNetError, myAddrError)

expected := "mock: op error,mock addr error"
actual := multiError.Error()
if actual != expected {
t.Errorf("unexpected value '%s' vs '%s'", expected, actual)
}

if !errors.Is(multiError, myNetError) {
t.Error("errors.Is unexpected result")
}

if !errors.Is(multiError, myAddrError) {
t.Error("errors.Is unexpected result")
}

var opError *net.OpError
if !errors.As(multiError, &opError) {
t.Error("errors.As unexpected result")
} else if opError != myNetError {
t.Error("errors.As wrong value")
}

var addrError *net.AddrError
if !errors.As(multiError, &addrError) {
t.Error("errors.As unexpected result")
} else if addrError != myAddrError {
t.Error("errors.As wrong value")
}
}

func TestSentinelWithWrappedError(t *testing.T) {
myNetError := &net.OpError{Op: "mock", Err: errors.New("op error")}
error := Wrap(ErrOutOfBrokers, myNetError)

expected := fmt.Sprintf("%s: mock: op error", ErrOutOfBrokers)
actual := error.Error()
if actual != expected {
t.Errorf("unexpected value '%s' vs '%s'", expected, actual)
}

if !errors.Is(error, ErrOutOfBrokers) {
t.Error("errors.Is unexpected result")
}

if !errors.Is(error, myNetError) {
t.Error("errors.Is unexpected result")
}

var opError *net.OpError
if !errors.As(error, &opError) {
t.Error("errors.As unexpected result")
} else if opError != myNetError {
t.Error("errors.As wrong value")
}

unwrapped := errors.Unwrap(error)
if errors.Is(unwrapped, ErrOutOfBrokers) || !errors.Is(unwrapped, myNetError) {
t.Errorf("unexpected unwrapped value %v vs %vs", error, unwrapped)
}
}

func TestSentinelWithMultipleWrappedErrors(t *testing.T) {
myNetError := &net.OpError{}
myAddrError := &net.AddrError{}

error := Wrap(ErrOutOfBrokers, myNetError, myAddrError)

if !errors.Is(error, ErrOutOfBrokers) {
t.Error("errors.Is unexpected result")
}

if !errors.Is(error, myNetError) {
t.Error("errors.Is unexpected result")
}

if !errors.Is(error, myAddrError) {
t.Error("errors.Is unexpected result")
}

unwrapped := errors.Unwrap(error)
if errors.Is(unwrapped, ErrOutOfBrokers) || !errors.Is(unwrapped, myNetError) || !errors.Is(unwrapped, myAddrError) {
t.Errorf("unwrapped value unexpected result")
}
}

0 comments on commit 2d2a020

Please sign in to comment.