Skip to content

Commit

Permalink
kgo broker: on invalidly large reads, guess if this is a tls error
Browse files Browse the repository at this point in the history
This removes the unused-outside-of-this-one-location ErrLargeRespSize
  • Loading branch information
twmb committed Feb 15, 2021
1 parent 62abaaf commit d1ecc7b
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 17 deletions.
27 changes: 26 additions & 1 deletion pkg/kgo/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kgo

import (
"context"
"crypto/tls"
"encoding/binary"
"fmt"
"io"
Expand Down Expand Up @@ -759,7 +760,31 @@ func (cxn *brokerCxn) readConn(ctx context.Context, timeout time.Duration, enque
return
}
if maxSize := cxn.b.cl.cfg.maxBrokerReadBytes; size > maxSize {
err = &ErrLargeRespSize{Size: size, Limit: maxSize}
// A TLS alert is 21, and a TLS alert has the version
// following, where all major versions are 03xx. We
// look for an alert and major version byte to suspect
// if this we received a TLS alert.
tlsVersion := uint16(sizeBuf[1]) | uint16(sizeBuf[2])
if sizeBuf[0] == 21 && tlsVersion&0x0300 != 0 {
versionGuess := fmt.Sprintf("unknown TLS version (hex %x)", tlsVersion)
for _, guess := range []struct {
num uint16
text string
}{
{tls.VersionSSL30, "SSL v3"},
{tls.VersionTLS10, "TLS v1.0"},
{tls.VersionTLS11, "TLS v1.1"},
{tls.VersionTLS12, "TLS v1.2"},
{tls.VersionTLS13, "TLS v1.3"},
} {
if tlsVersion == guess.num {
versionGuess = guess.text
}
}
err = fmt.Errorf("invalid large response size %d > limit %d; the first three bytes recieved appear to be a tls alert record for %s; is this a plaintext connection speaking to a tls endpoint?", size, maxSize, versionGuess)
} else {
err = fmt.Errorf("invalid large response size %d > limit %d", size, maxSize)
}
return
}
buf = make([]byte, size)
Expand Down
16 changes: 0 additions & 16 deletions pkg/kgo/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,22 +123,6 @@ type ErrDataLoss struct {
ResetTo int64
}

// ErrLargeRespSize is return when Kafka replies that a response will be more
// bytes than this client allows (see the BrokerMaxReadBytes option).
//
// If this error happens, the client closes the broker connection.
type ErrLargeRespSize struct {
// The size that was replied.
Size int32
// The limit that the size exceeded.
Limit int32
}

func (e *ErrLargeRespSize) Error() string {
return fmt.Sprintf("invalid large response size %d > limit %d",
e.Size, e.Limit)
}

func (e *ErrDataLoss) Error() string {
return fmt.Sprintf("topic %s partition %d lost records;"+
" the client consumed to offset %d but was reset to offset %d",
Expand Down

0 comments on commit d1ecc7b

Please sign in to comment.