From 5fb0a3831f56e444497131674b582113344704db Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Sat, 10 Oct 2020 19:55:51 -0600 Subject: [PATCH] broker: fix SaslAuthenticate, all: minor cleanliness Turns out the hardcoded sasl authenticate key was wrong (36 vs 37). I'm not sure how this worked on my local cluster with plain & scram. While looking into that... this commit fixes that, avoids hardcoding keys in broker (while keeping two keys hardcoded in client), and makes nicer some log messages. Lastly, for KIP-368, after looking at the PR, it looks like the first step when reauthenticating is to handshake. So, this switches doSasl() to sasl() to go to handshake first. --- pkg/kgo/broker.go | 28 ++++++++++++++-------------- pkg/kgo/client.go | 9 +++------ pkg/kversion/kversion.go | 2 +- 3 files changed, 18 insertions(+), 21 deletions(-) diff --git a/pkg/kgo/broker.go b/pkg/kgo/broker.go index 48d5141d..442367ec 100644 --- a/pkg/kgo/broker.go +++ b/pkg/kgo/broker.go @@ -272,7 +272,7 @@ func (b *broker) handleReqs() { // can only have an expiry if we went the authenticate // flow, so we know we are authenticating again. // For KIP-368. - if err = cxn.doSasl(true); err != nil { + if err = cxn.sasl(); err != nil { pr.promise(nil, err) cxn.die() continue @@ -353,9 +353,11 @@ func (b *broker) loadConnection(ctx context.Context, reqKey int16) (*brokerCxn, sasls: b.cl.cfg.sasls, } if err = cxn.init(b.cl.cfg.maxVersions); err != nil { + b.cl.cfg.logger.Log(LogLevelDebug, "connection initialization failed", "addr", b.addr, "id", b.id, "err", err) conn.Close() return nil, err } + b.cl.cfg.logger.Log(LogLevelDebug, "connection initialized successfully", "addr", b.addr, "id", b.id) *pcxn = cxn return cxn, nil @@ -429,7 +431,6 @@ func (cxn *brokerCxn) init(maxVersions kversion.Versions) error { cxn.resps = make(chan promisedResp, 10) go cxn.handleResps() - cxn.l.Log(LogLevelDebug, "connection initialized successfully") return nil } @@ -499,15 +500,13 @@ func (cxn *brokerCxn) sasl() error { mechanism := cxn.sasls[0] retried := false authenticate := false - const handshakeKey = 17 + req := new(kmsg.SASLHandshakeRequest) start: - if mechanism.Name() != "GSSAPI" && cxn.versions[handshakeKey] >= 0 { - req := &kmsg.SASLHandshakeRequest{ - Version: cxn.versions[handshakeKey], - Mechanism: mechanism.Name(), - } - cxn.l.Log(LogLevelDebug, "writing SASLHandshakeRequest") + if mechanism.Name() != "GSSAPI" && cxn.versions[req.Key()] >= 0 { + req.Mechanism = mechanism.Name() + req.Version = cxn.versions[req.Key()] + cxn.l.Log(LogLevelDebug, "issuing SASLHandshakeRequest") corrID, err := cxn.writeRequest(req) if err != nil { return err @@ -563,12 +562,11 @@ func (cxn *brokerCxn) doSasl(authenticate bool) error { // We continue writing until both the challenging is done AND the // responses are done. We can have an additional response once we // are done with challenges. - step := 0 + step := -1 for done := false; !done || len(clientWrite) > 0; { + step++ var challenge []byte - cxn.l.Log(LogLevelDebug, "issuing authentication step", "sasl_authenticate_request_envelope", authenticate, "step", step) - step++ if !authenticate { buf := cxn.bufPool.get() @@ -579,6 +577,7 @@ func (cxn *brokerCxn) doSasl(authenticate bool) error { if wt > 0 { cxn.conn.SetWriteDeadline(time.Now().Add(wt)) } + cxn.l.Log(LogLevelDebug, "issuing raw sasl authenticate", "step", step) _, err = cxn.conn.Write(buf) if wt > 0 { cxn.conn.SetWriteDeadline(time.Time{}) @@ -596,11 +595,12 @@ func (cxn *brokerCxn) doSasl(authenticate bool) error { } } else { - const authenticateKey = 37 req := &kmsg.SASLAuthenticateRequest{ - Version: cxn.versions[authenticateKey], SASLAuthBytes: clientWrite, } + req.Version = cxn.versions[req.Key()] + cxn.l.Log(LogLevelDebug, "issuing SASLAuthenticate", "version", req.Version, "step", step) + corrID, err := cxn.writeRequest(req) if err != nil { return err diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go index bb259b29..5a606532 100644 --- a/pkg/kgo/client.go +++ b/pkg/kgo/client.go @@ -283,9 +283,8 @@ func (cl *Client) fetchMetadata(ctx context.Context, all bool, topics []string) topics = []string{} } tries := 0 - const key = 3 // metadata request key tryStart := time.Now() - retryTimeout := cl.cfg.retryTimeout(key) + retryTimeout := cl.cfg.retryTimeout(3) // 3 is metadata request key start: tries++ broker := cl.broker() @@ -469,8 +468,7 @@ func (cl *Client) request(ctx context.Context, req kmsg.Request) (kmsg.Response, var err error tries := 0 tryStart := time.Now() - key := req.Key() - retryTimeout := cl.cfg.retryTimeout(key) + retryTimeout := cl.cfg.retryTimeout(req.Key()) start: tries++ if metaReq, isMetaReq := req.(*kmsg.MetadataRequest); isMetaReq { @@ -571,10 +569,9 @@ func (cl *Client) loadCoordinator(ctx context.Context, key coordinatorKey) (*bro } } - const reqKey = 10 tries := 0 tryStart := time.Now() - retryTimeout := cl.cfg.retryTimeout(reqKey) + retryTimeout := cl.cfg.retryTimeout(10) // 10 is find coordinator key start: cl.coordinatorsMu.Lock() coordinator, ok := cl.coordinators[key] diff --git a/pkg/kversion/kversion.go b/pkg/kversion/kversion.go index bf09d20d..0f7dde26 100644 --- a/pkg/kversion/kversion.go +++ b/pkg/kversion/kversion.go @@ -396,7 +396,7 @@ func Tip() Versions { v[25]++ // 2 add offsets to txn v[26]++ // 2 end txn - v[18]++ // KAFKA-10027 4f96c5b424956355339dd3216c426c1c0388fe9e KIP-584 + v[18]++ // 4 api versions KAFKA-10027 4f96c5b424956355339dd3216c426c1c0388fe9e KIP-584 v = append(v, 0, // 50 describe user scram creds, KAFKA-10259 e8524ccd8fca0caac79b844d87e98e9c055f76fb KIP-554