From d1ecc7bfcb8979eeed1bf2638bbda343b74b2038 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Mon, 15 Feb 2021 11:13:11 -0700 Subject: [PATCH] kgo broker: on invalidly large reads, guess if this is a tls error This removes the unused-outside-of-this-one-location ErrLargeRespSize --- pkg/kgo/broker.go | 27 ++++++++++++++++++++++++++- pkg/kgo/errors.go | 16 ---------------- 2 files changed, 26 insertions(+), 17 deletions(-) diff --git a/pkg/kgo/broker.go b/pkg/kgo/broker.go index 64253479..8c2532e2 100644 --- a/pkg/kgo/broker.go +++ b/pkg/kgo/broker.go @@ -2,6 +2,7 @@ package kgo import ( "context" + "crypto/tls" "encoding/binary" "fmt" "io" @@ -759,7 +760,31 @@ func (cxn *brokerCxn) readConn(ctx context.Context, timeout time.Duration, enque return } if maxSize := cxn.b.cl.cfg.maxBrokerReadBytes; size > maxSize { - err = &ErrLargeRespSize{Size: size, Limit: maxSize} + // A TLS alert is 21, and a TLS alert has the version + // following, where all major versions are 03xx. We + // look for an alert and major version byte to suspect + // if this we received a TLS alert. + tlsVersion := uint16(sizeBuf[1]) | uint16(sizeBuf[2]) + if sizeBuf[0] == 21 && tlsVersion&0x0300 != 0 { + versionGuess := fmt.Sprintf("unknown TLS version (hex %x)", tlsVersion) + for _, guess := range []struct { + num uint16 + text string + }{ + {tls.VersionSSL30, "SSL v3"}, + {tls.VersionTLS10, "TLS v1.0"}, + {tls.VersionTLS11, "TLS v1.1"}, + {tls.VersionTLS12, "TLS v1.2"}, + {tls.VersionTLS13, "TLS v1.3"}, + } { + if tlsVersion == guess.num { + versionGuess = guess.text + } + } + err = fmt.Errorf("invalid large response size %d > limit %d; the first three bytes recieved appear to be a tls alert record for %s; is this a plaintext connection speaking to a tls endpoint?", size, maxSize, versionGuess) + } else { + err = fmt.Errorf("invalid large response size %d > limit %d", size, maxSize) + } return } buf = make([]byte, size) diff --git a/pkg/kgo/errors.go b/pkg/kgo/errors.go index d32c3088..d073e5bb 100644 --- a/pkg/kgo/errors.go +++ b/pkg/kgo/errors.go @@ -123,22 +123,6 @@ type ErrDataLoss struct { ResetTo int64 } -// ErrLargeRespSize is return when Kafka replies that a response will be more -// bytes than this client allows (see the BrokerMaxReadBytes option). -// -// If this error happens, the client closes the broker connection. -type ErrLargeRespSize struct { - // The size that was replied. - Size int32 - // The limit that the size exceeded. - Limit int32 -} - -func (e *ErrLargeRespSize) Error() string { - return fmt.Sprintf("invalid large response size %d > limit %d", - e.Size, e.Limit) -} - func (e *ErrDataLoss) Error() string { return fmt.Sprintf("topic %s partition %d lost records;"+ " the client consumed to offset %d but was reset to offset %d",