Skip to content

Commit

Permalink
fix: make ErrOutOfBrokers wrap underlying net err
Browse files Browse the repository at this point in the history
Ensure that ErrOutOfBrokers exception(s) include the actual underlying
error that prevented successful connection(s) to the brokers.

Fixes IBM#2128
  • Loading branch information
k-wall committed Feb 10, 2022
1 parent df3aa4d commit 86767a3
Show file tree
Hide file tree
Showing 4 changed files with 216 additions and 4 deletions.
11 changes: 8 additions & 3 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -878,6 +878,7 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int,
}

broker := client.any()
brokerErrors := make([]error, 0)
for ; broker != nil && !pastDeadline(0); broker = client.any() {
allowAutoTopicCreation := client.conf.Metadata.AllowAutoTopicCreation
if len(topics) > 0 {
Expand Down Expand Up @@ -928,19 +929,21 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int,
default:
// some other error, remove that broker and try again
Logger.Printf("client/metadata got error from broker %d while fetching metadata: %v\n", broker.ID(), err)
brokerErrors = append(brokerErrors, err)
_ = broker.Close()
client.deregisterBroker(broker)
}
}

error := Wrap(ErrOutOfBrokers, brokerErrors...)
if broker != nil {
Logger.Printf("client/metadata not fetching metadata from broker %s as we would go past the metadata timeout\n", broker.addr)
return retry(ErrOutOfBrokers)
return retry(error)
}

Logger.Println("client/metadata no available broker to send metadata request to")
client.resurrectDeadBrokers()
return retry(ErrOutOfBrokers)
return retry(error)
}

// if no fatal error, returns a list of topics that need retrying due to ErrLeaderNotAvailable
Expand Down Expand Up @@ -1047,6 +1050,7 @@ func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemainin
return nil, err
}

brokerErrors := make([]error, 0)
for broker := client.any(); broker != nil; broker = client.any() {
DebugLogger.Printf("client/coordinator requesting coordinator for consumergroup %s from %s\n", consumerGroup, broker.Addr())

Expand All @@ -1063,6 +1067,7 @@ func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemainin
return nil, err
default:
_ = broker.Close()
brokerErrors = append(brokerErrors, err)
client.deregisterBroker(broker)
continue
}
Expand Down Expand Up @@ -1096,7 +1101,7 @@ func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemainin

Logger.Println("client/coordinator no available broker to send consumer metadata request to")
client.resurrectDeadBrokers()
return retry(ErrOutOfBrokers)
return retry(Wrap(ErrOutOfBrokers, brokerErrors...))
}

// nopCloserClient embeds an existing Client, but disables
Expand Down
39 changes: 39 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"io"
"sync"
"sync/atomic"
"syscall"
"testing"
"time"
)
Expand Down Expand Up @@ -997,3 +998,41 @@ func TestClientAutorefreshShutdownRace(t *testing.T) {
// give the update time to happen so we get a panic if it's still running (which it shouldn't)
time.Sleep(10 * time.Millisecond)
}

func TestClientConnectionRefused(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
seedBroker.Close()

_, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig())
if !errors.Is(err, ErrOutOfBrokers) {
t.Fatalf("unexpected error: %v", err)
}

if !errors.Is(err, syscall.ECONNREFUSED) {
t.Fatalf("unexpected error: %v", err)
}
}

func TestClientCoordinatorConnectionRefused(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
seedBroker.Returns(new(MetadataResponse))

client, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig())
if err != nil {
t.Fatal(err)
}

seedBroker.Close()

_, err = client.Coordinator("my_group")

if !errors.Is(err, ErrOutOfBrokers) {
t.Fatalf("unexpected error: %v", err)
}

if !errors.Is(err, syscall.ECONNREFUSED) {
t.Fatalf("unexpected error: %v", err)
}

safeClose(t, client)
}
72 changes: 71 additions & 1 deletion errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,37 @@ var ErrNoTopicsToUpdateMetadata = errors.New("kafka: no specific topics to updat
// ErrUnknownScramMechanism is returned when user tries to AlterUserScramCredentials with unknown SCRAM mechanism
var ErrUnknownScramMechanism = errors.New("kafka: unknown SCRAM mechanism provided")

type sentinelError struct {
sentinel error
wrapped error
}

func (err sentinelError) Error() string {
if err.wrapped != nil {
return fmt.Sprintf("%s: %v", err.sentinel, err.wrapped)
} else {
return fmt.Sprintf("%s", err.sentinel)
}
}

func (err sentinelError) Is(target error) bool {
return errors.Is(err.sentinel, target) || errors.Is(err.wrapped, target)
}

func (err sentinelError) Unwrap() error {
return err.wrapped
}

func Wrap(sentinel error, wrapped ...error) sentinelError {
if len(wrapped) == 0 {
return sentinelError{sentinel, nil}
} else if len(wrapped) == 1 {
return sentinelError{sentinel, wrapped[0]}
} else {
return sentinelError{sentinel, NewMultiError(wrapped...)}
}
}

// PacketEncodingError is returned from a failure while encoding a Kafka packet. This can happen, for example,
// if you try to encode a string over 2^15 characters in length, since Kafka's encoding rules do not permit that.
type PacketEncodingError struct {
Expand Down Expand Up @@ -92,14 +123,49 @@ type MultiError struct {
Errors *[]error
}

func (mErr MultiError) Is(target error) bool {
if mErr.Errors != nil {
for _, e := range *mErr.Errors {
if errors.Is(e, target) {
return true
}
}
}
return false
}

func (mErr MultiError) As(target interface{}) bool {
if mErr.Errors != nil {
for _, e := range *mErr.Errors {
if errors.As(e, target) {
return true
}
}
}
return false
}

func (mErr MultiError) Error() string {
errString := ""
for _, err := range *mErr.Errors {
errString += err.Error() + ","
if len(errString) > 0 {
errString += ","
}
errString += err.Error()
}
return errString
}

// The approach taken by https://github.com/hashicorp/go-multierror/blob/master/multierror.go#L85
// might be a considered (or maybe even use the dependency)?
func (mErr MultiError) Unwrap() error {
if mErr.Errors != nil && len(*mErr.Errors) > 0 {
return (*mErr.Errors)[0]
} else {
return nil
}
}

func (mErr MultiError) PrettyError() string {
errString := ""
for _, err := range *mErr.Errors {
Expand All @@ -108,6 +174,10 @@ func (mErr MultiError) PrettyError() string {
return errString
}

func NewMultiError(errs ...error) MultiError {
return MultiError{&errs}
}

// ErrDeleteRecords is the type of error returned when fail to delete the required records
type ErrDeleteRecords struct {
MultiError
Expand Down
98 changes: 98 additions & 0 deletions errors_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package sarama

import (
"errors"
"fmt"
"net"
"testing"
)

func TestMultiError(t *testing.T) {
myNetError := &net.OpError{Op: "mock", Err: errors.New("op error")}
myAddrError := &net.AddrError{Err: "mock addr error"}

multiError := NewMultiError(myNetError, myAddrError)

expected := "mock: op error,mock addr error"
actual := multiError.Error()
if actual != expected {
t.Errorf("unexpected value '%s' vs '%s'", expected, actual)
}

if !errors.Is(multiError, myNetError) {
t.Error("errors.Is unexpected result")
}

if !errors.Is(multiError, myAddrError) {
t.Error("errors.Is unexpected result")
}

var opError *net.OpError
if !errors.As(multiError, &opError) {
t.Error("errors.As unexpected result")
} else if opError != myNetError {
t.Error("errors.As wrong value")
}

var addrError *net.AddrError
if !errors.As(multiError, &addrError) {
t.Error("errors.As unexpected result")
} else if addrError != myAddrError {
t.Error("errors.As wrong value")
}
}

func TestSentinelWithWrappedError(t *testing.T) {
myNetError := &net.OpError{Op: "mock", Err: errors.New("op error")}
error := Wrap(ErrOutOfBrokers, myNetError)

expected := fmt.Sprintf("%s: mock: op error", ErrOutOfBrokers)
actual := error.Error()
if actual != expected {
t.Errorf("unexpected value '%s' vs '%s'", expected, actual)
}

if !errors.Is(error, ErrOutOfBrokers) {
t.Error("errors.Is unexpected result")
}

if !errors.Is(error, myNetError) {
t.Error("errors.Is unexpected result")
}

var opError *net.OpError
if !errors.As(error, &opError) {
t.Error("errors.As unexpected result")
} else if opError != myNetError {
t.Error("errors.As wrong value")
}

unwrapped := errors.Unwrap(error)
if errors.Is(unwrapped, ErrOutOfBrokers) || !errors.Is(unwrapped, myNetError) {
t.Errorf("unexpected unwrapped value %v vs %vs", error, unwrapped)
}
}

func TestSentinelWithMultipleWrappedErrors(t *testing.T) {
myNetError := &net.OpError{}
myAddrError := &net.AddrError{}

error := Wrap(ErrOutOfBrokers, myNetError, myAddrError)

if !errors.Is(error, ErrOutOfBrokers) {
t.Error("errors.Is unexpected result")
}

if !errors.Is(error, myNetError) {
t.Error("errors.Is unexpected result")
}

if !errors.Is(error, myAddrError) {
t.Error("errors.Is unexpected result")
}

unwrapped := errors.Unwrap(error)
if errors.Is(unwrapped, ErrOutOfBrokers) || !errors.Is(unwrapped, myNetError) || !errors.Is(unwrapped, myAddrError) {
t.Errorf("unwrapped value unexpected result")
}
}

0 comments on commit 86767a3

Please sign in to comment.