Skip to content

Commit

Permalink
fix: make ErrOutOfBrokers wrap underlying net err
Browse files Browse the repository at this point in the history
Ensure that ErrOutOfBrokers exception(s) include the actual underlying
error that prevented successful connection(s) to the brokers.

Fixes IBM#2128
  • Loading branch information
k-wall committed Feb 24, 2022
1 parent 8f626c8 commit 5fdeb77
Show file tree
Hide file tree
Showing 8 changed files with 207 additions and 53 deletions.
4 changes: 2 additions & 2 deletions admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ func (ca *clusterAdmin) AlterPartitionReassignments(topic string, assignment [][
}

if len(errs) > 0 {
return ErrReassignPartitions{MultiError{&errs}}
return Wrap(ErrReassignPartitions, errs...)
}

return nil
Expand Down Expand Up @@ -604,7 +604,7 @@ func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]i
}
}
if len(errs) > 0 {
return ErrDeleteRecords{MultiError{&errs}}
return Wrap(ErrDeleteRecords, errs...)
}
// todo since we are dealing with couple of partitions it would be good if we return slice of errors
// for each partition instead of one error
Expand Down
14 changes: 7 additions & 7 deletions admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,20 +639,20 @@ func TestClusterAdminDeleteRecordsWithDiffVersion(t *testing.T) {
partitionOffset[3] = 1000

err = admin.DeleteRecords(topicName, partitionOffset)
if err == nil {
t.Fatal("expected an ErrDeleteRecords")
}

if !strings.HasPrefix(err.Error(), "kafka server: failed to delete records") {
t.Fatal(err)
}
var deleteRecordsError ErrDeleteRecords
ok := errors.As(err, &deleteRecordsError)

if !ok {
if !errors.Is(err, ErrDeleteRecords) {
t.Fatal(err)
}

for _, err := range *deleteRecordsError.Errors {
if !errors.Is(err, ErrUnsupportedVersion) {
t.Fatal(err)
}
if !errors.Is(err, ErrUnsupportedVersion) {
t.Fatal(err)
}

err = admin.Close()
Expand Down
18 changes: 12 additions & 6 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,23 +212,24 @@ func (client *client) Broker(brokerID int32) (*Broker, error) {
}

func (client *client) InitProducerID() (*InitProducerIDResponse, error) {
err := ErrOutOfBrokers
brokerErrors := make([]error, 0)
for broker := client.any(); broker != nil; broker = client.any() {
var response *InitProducerIDResponse
req := &InitProducerIDRequest{}

response, err = broker.InitProducerID(req)
response, err := broker.InitProducerID(req)
if err == nil {
return response, nil
} else {
// some error, remove that broker and try again
Logger.Printf("Client got error from broker %d when issuing InitProducerID : %v\n", broker.ID(), err)
_ = broker.Close()
brokerErrors = append(brokerErrors, err)
client.deregisterBroker(broker)
}
}

return nil, err
return nil, Wrap(ErrOutOfBrokers, brokerErrors...)
}

func (client *client) Close() error {
Expand Down Expand Up @@ -875,6 +876,7 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int,
}

broker := client.any()
brokerErrors := make([]error, 0)
for ; broker != nil && !pastDeadline(0); broker = client.any() {
allowAutoTopicCreation := client.conf.Metadata.AllowAutoTopicCreation
if len(topics) > 0 {
Expand Down Expand Up @@ -923,19 +925,21 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int,
} else {
// some other error, remove that broker and try again
Logger.Printf("client/metadata got error from broker %d while fetching metadata: %v\n", broker.ID(), err)
brokerErrors = append(brokerErrors, err)
_ = broker.Close()
client.deregisterBroker(broker)
}
}

error := Wrap(ErrOutOfBrokers, brokerErrors...)
if broker != nil {
Logger.Printf("client/metadata not fetching metadata from broker %s as we would go past the metadata timeout\n", broker.addr)
return retry(ErrOutOfBrokers)
return retry(error)
}

Logger.Println("client/metadata no available broker to send metadata request to")
client.resurrectDeadBrokers()
return retry(ErrOutOfBrokers)
return retry(error)
}

// if no fatal error, returns a list of topics that need retrying due to ErrLeaderNotAvailable
Expand Down Expand Up @@ -1042,6 +1046,7 @@ func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemainin
return nil, err
}

brokerErrors := make([]error, 0)
for broker := client.any(); broker != nil; broker = client.any() {
DebugLogger.Printf("client/coordinator requesting coordinator for consumergroup %s from %s\n", consumerGroup, broker.Addr())

Expand All @@ -1058,6 +1063,7 @@ func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemainin
return nil, err
} else {
_ = broker.Close()
brokerErrors = append(brokerErrors, err)
client.deregisterBroker(broker)
continue
}
Expand Down Expand Up @@ -1088,7 +1094,7 @@ func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemainin

Logger.Println("client/coordinator no available broker to send consumer metadata request to")
client.resurrectDeadBrokers()
return retry(ErrOutOfBrokers)
return retry(Wrap(ErrOutOfBrokers, brokerErrors...))
}

// nopCloserClient embeds an existing Client, but disables
Expand Down
72 changes: 72 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"io"
"sync"
"sync/atomic"
"syscall"
"testing"
"time"
)
Expand Down Expand Up @@ -997,3 +998,74 @@ func TestClientAutorefreshShutdownRace(t *testing.T) {
// give the update time to happen so we get a panic if it's still running (which it shouldn't)
time.Sleep(10 * time.Millisecond)
}

func TestClientConnectionRefused(t *testing.T) {
t.Parallel()
seedBroker := NewMockBroker(t, 1)
seedBroker.Close()

_, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig())
if !errors.Is(err, ErrOutOfBrokers) {
t.Fatalf("unexpected error: %v", err)
}

if !errors.Is(err, syscall.ECONNREFUSED) {
t.Fatalf("unexpected error: %v", err)
}
}

func TestClientCoordinatorConnectionRefused(t *testing.T) {
t.Parallel()
seedBroker := NewMockBroker(t, 1)
seedBroker.Returns(new(MetadataResponse))

client, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig())
if err != nil {
t.Fatal(err)
}

seedBroker.Close()

_, err = client.Coordinator("my_group")

if !errors.Is(err, ErrOutOfBrokers) {
t.Fatalf("unexpected error: %v", err)
}

if !errors.Is(err, syscall.ECONNREFUSED) {
t.Fatalf("unexpected error: %v", err)
}

safeClose(t, client)
}

func TestInitProducerIDConnectionRefused(t *testing.T) {
t.Parallel()
seedBroker := NewMockBroker(t, 1)
seedBroker.Returns(&MetadataResponse{Version: 1})

config := NewTestConfig()
config.Producer.Idempotent = true
config.Version = V0_11_0_0
config.Producer.RequiredAcks = WaitForAll
config.Net.MaxOpenRequests = 1

client, err := NewClient([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

seedBroker.Close()

_, err = client.InitProducerID()

if !errors.Is(err, ErrOutOfBrokers) {
t.Fatalf("unexpected error: %v", err)
}

if !errors.Is(err, io.EOF) {
t.Fatalf("unexpected error: %v", err)
}

safeClose(t, client)
}
82 changes: 44 additions & 38 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package sarama
import (
"errors"
"fmt"

"github.com/hashicorp/go-multierror"
)

// ErrOutOfBrokers is the error returned when the client has run out of brokers to talk to because all of them errored
Expand Down Expand Up @@ -55,6 +57,48 @@ var ErrNoTopicsToUpdateMetadata = errors.New("kafka: no specific topics to updat
// ErrUnknownScramMechanism is returned when user tries to AlterUserScramCredentials with unknown SCRAM mechanism
var ErrUnknownScramMechanism = errors.New("kafka: unknown SCRAM mechanism provided")

// ErrReassignPartitions is returned when altering partition assignments for a topic fails
var ErrReassignPartitions = errors.New("failed to reassign partitions for topic")

// ErrDeleteRecords is the type of error returned when fail to delete the required records
var ErrDeleteRecords = errors.New("kafka server: failed to delete records")

// The formatter used to format multierrors
var MultiErrorFormat multierror.ErrorFormatFunc

type sentinelError struct {
sentinel error
wrapped error
}

func (err sentinelError) Error() string {
if err.wrapped != nil {
return fmt.Sprintf("%s: %v", err.sentinel, err.wrapped)
} else {
return fmt.Sprintf("%s", err.sentinel)
}
}

func (err sentinelError) Is(target error) bool {
return errors.Is(err.sentinel, target) || errors.Is(err.wrapped, target)
}

func (err sentinelError) Unwrap() error {
return err.wrapped
}

func Wrap(sentinel error, wrapped ...error) sentinelError {
return sentinelError{sentinel: sentinel, wrapped: multiError(wrapped...)}
}

func multiError(wrapped ...error) error {
merr := multierror.Append(nil, wrapped...)
if MultiErrorFormat != nil {
merr.ErrorFormat = MultiErrorFormat
}
return merr.ErrorOrNil()
}

// PacketEncodingError is returned from a failure while encoding a Kafka packet. This can happen, for example,
// if you try to encode a string over 2^15 characters in length, since Kafka's encoding rules do not permit that.
type PacketEncodingError struct {
Expand Down Expand Up @@ -87,44 +131,6 @@ func (err ConfigurationError) Error() string {
// See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes
type KError int16

// MultiError is used to contain multi error.
type MultiError struct {
Errors *[]error
}

func (mErr MultiError) Error() string {
errString := ""
for _, err := range *mErr.Errors {
errString += err.Error() + ","
}
return errString
}

func (mErr MultiError) PrettyError() string {
errString := ""
for _, err := range *mErr.Errors {
errString += err.Error() + "\n"
}
return errString
}

// ErrDeleteRecords is the type of error returned when fail to delete the required records
type ErrDeleteRecords struct {
MultiError
}

func (err ErrDeleteRecords) Error() string {
return "kafka server: failed to delete records " + err.MultiError.Error()
}

type ErrReassignPartitions struct {
MultiError
}

func (err ErrReassignPartitions) Error() string {
return fmt.Sprintf("failed to reassign partitions for topic: \n%s", err.MultiError.PrettyError())
}

// Numeric error codes returned by the Kafka server.
const (
ErrNoError KError = 0
Expand Down
65 changes: 65 additions & 0 deletions errors_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package sarama

import (
"errors"
"fmt"
"net"
"testing"
)

func TestSentinelWithWrappedError(t *testing.T) {
t.Parallel()
myNetError := &net.OpError{Op: "mock", Err: errors.New("op error")}
error := Wrap(ErrOutOfBrokers, myNetError)

expected := fmt.Sprintf("%s: 1 error occurred:\n\t* %s\n\n", ErrOutOfBrokers, myNetError)
actual := error.Error()
if actual != expected {
t.Errorf("unexpected value '%s' vs '%v'", expected, actual)
}

if !errors.Is(error, ErrOutOfBrokers) {
t.Error("errors.Is unexpected result")
}

if !errors.Is(error, myNetError) {
t.Error("errors.Is unexpected result")
}

var opError *net.OpError
if !errors.As(error, &opError) {
t.Error("errors.As unexpected result")
} else if opError != myNetError {
t.Error("errors.As wrong value")
}

unwrapped := errors.Unwrap(error)
if errors.Is(unwrapped, ErrOutOfBrokers) || !errors.Is(unwrapped, myNetError) {
t.Errorf("unexpected unwrapped value %v vs %vs", error, unwrapped)
}
}

func TestSentinelWithMultipleWrappedErrors(t *testing.T) {
t.Parallel()
myNetError := &net.OpError{}
myAddrError := &net.AddrError{}

error := Wrap(ErrOutOfBrokers, myNetError, myAddrError)

if !errors.Is(error, ErrOutOfBrokers) {
t.Error("errors.Is unexpected result")
}

if !errors.Is(error, myNetError) {
t.Error("errors.Is unexpected result")
}

if !errors.Is(error, myAddrError) {
t.Error("errors.Is unexpected result")
}

unwrapped := errors.Unwrap(error)
if errors.Is(unwrapped, ErrOutOfBrokers) || !errors.Is(unwrapped, myNetError) || !errors.Is(unwrapped, myAddrError) {
t.Errorf("unwrapped value unexpected result")
}
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/fortytw2/leaktest v1.3.0
github.com/frankban/quicktest v1.14.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/jcmturner/gofork v1.0.0
github.com/jcmturner/gokrb5/v8 v8.4.2
github.com/klauspost/compress v1.14.4
Expand Down
Loading

0 comments on commit 5fdeb77

Please sign in to comment.