Skip to content

Commit

Permalink
fix minor Microsoft EventHubs issues
Browse files Browse the repository at this point in the history
See embedded comments; Kafka actually does not document either behavior
that obviously, so it is what it is.

For the nullable bytes thing, this is most clearly visible in the
autogenerated code; exceptions are thrown if the length value is
negative for non-nullable fields. The protocol does not clearly
explicitly say that negatives are invalid for non-nullable bytes fields.

For the api versions, this is visible in the Kafka code and mentioned in
passing in a KIP or two. The KIP bit is definitely not easy to find, but
the code bit is.
  • Loading branch information
twmb committed Oct 27, 2020
1 parent 9eef141 commit 0ddc468
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 6 deletions.
12 changes: 12 additions & 0 deletions pkg/kbin/primitives.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,15 @@ func (b *Reader) CompactNullableString() *string {
// This never returns nil.
func (b *Reader) Bytes() []byte {
l := b.Int32()
// This is not to spec, but it is not clearly documented and Microsoft
// EventHubs fails here. -1 means null, which should throw an
// exception. EventHubs uses -1 to mean "does not exist" on some
// non-nullable fields.
//
// Until EventHubs is fixed, we return an empty byte slice for null.
if l == -1 {
return []byte{}
}
return b.Span(int(l))
}

Expand All @@ -483,6 +492,9 @@ func (b *Reader) Bytes() []byte {
// This never returns nil.
func (b *Reader) CompactBytes() []byte {
l := int(b.Uvarint()) - 1
if l == -1 { // same as above: -1 should not be allowed here
return []byte{}
}
return b.Span(int(l))
}

Expand Down
15 changes: 9 additions & 6 deletions pkg/kgo/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/twmb/franz-go/pkg/kbin"
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kmsg"
"github.com/twmb/franz-go/pkg/kversion"
"github.com/twmb/franz-go/pkg/sasl"
)

Expand Down Expand Up @@ -392,7 +391,7 @@ func (b *broker) loadConnection(ctx context.Context, reqKey int16) (*brokerCxn,
addr: b.addr,
conn: conn,
}
if err = cxn.init(b.cl.cfg.maxVersions); err != nil {
if err = cxn.init(); err != nil {
b.cl.cfg.logger.Log(LogLevelDebug, "connection initialization failed", "addr", b.addr, "id", b.meta.NodeID, "err", err)
cxn.closeConn()
return nil, err
Expand Down Expand Up @@ -450,12 +449,12 @@ type brokerCxn struct {
dead int32
}

func (cxn *brokerCxn) init(maxVersions kversion.Versions) error {
func (cxn *brokerCxn) init() error {
for i := 0; i < len(cxn.versions[:]); i++ {
cxn.versions[i] = -1
}

if maxVersions == nil || len(maxVersions) >= 19 {
if cxn.b.cl.cfg.maxVersions == nil || len(cxn.b.cl.cfg.maxVersions) >= 19 {
if err := cxn.requestAPIVersions(); err != nil {
cxn.cl.cfg.logger.Log(LogLevelError, "unable to request api versions", "err", err)
return err
Expand Down Expand Up @@ -506,7 +505,11 @@ start:
if maxVersion == 0 {
return ErrConnDead
}
if string(rawResp) == "\x00\x23\x00\x00\x00\x00" {
srawResp := string(rawResp)
if srawResp == "\x00\x23\x00\x00\x00\x00" ||
// EventHubs erroneously replies with v1, so we check
// for that as well.
srawResp == "\x00\x23\x00\x00\x00\x00\x00\x00\x00\x00" {
cxn.cl.cfg.logger.Log(LogLevelDebug, "kafka does not know our ApiVersions version, downgrading to version 0 and retrying")
maxVersion = 0
goto start
Expand Down Expand Up @@ -577,7 +580,7 @@ start:
}
authenticate = req.Version == 1
}
cxn.cl.cfg.logger.Log(LogLevelDebug, "beginning sasl authentication", "mechanism", mechanism.Name())
cxn.cl.cfg.logger.Log(LogLevelDebug, "beginning sasl authentication", "mechanism", mechanism.Name(), "authenticate", authenticate)
cxn.mechanism = mechanism
return cxn.doSasl(authenticate)
}
Expand Down

0 comments on commit 0ddc468

Please sign in to comment.