Skip to content

Commit

Permalink
broker: fix SaslAuthenticate, all: minor cleanliness
Browse files Browse the repository at this point in the history
Turns out the hardcoded sasl authenticate key was wrong (36 vs 37). I'm
not sure how this worked on my local cluster with plain & scram. While
looking into that... this commit fixes that, avoids hardcoding keys in
broker (while keeping two keys hardcoded in client), and makes nicer
some log messages.

Lastly, for KIP-368, after looking at the PR, it looks like the first
step when reauthenticating is to handshake. So, this switches doSasl()
to sasl() to go to handshake first.
  • Loading branch information
twmb committed Oct 11, 2020
1 parent 85209fb commit 5fb0a38
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 21 deletions.
28 changes: 14 additions & 14 deletions pkg/kgo/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ func (b *broker) handleReqs() {
// can only have an expiry if we went the authenticate
// flow, so we know we are authenticating again.
// For KIP-368.
if err = cxn.doSasl(true); err != nil {
if err = cxn.sasl(); err != nil {
pr.promise(nil, err)
cxn.die()
continue
Expand Down Expand Up @@ -353,9 +353,11 @@ func (b *broker) loadConnection(ctx context.Context, reqKey int16) (*brokerCxn,
sasls: b.cl.cfg.sasls,
}
if err = cxn.init(b.cl.cfg.maxVersions); err != nil {
b.cl.cfg.logger.Log(LogLevelDebug, "connection initialization failed", "addr", b.addr, "id", b.id, "err", err)
conn.Close()
return nil, err
}
b.cl.cfg.logger.Log(LogLevelDebug, "connection initialized successfully", "addr", b.addr, "id", b.id)

*pcxn = cxn
return cxn, nil
Expand Down Expand Up @@ -429,7 +431,6 @@ func (cxn *brokerCxn) init(maxVersions kversion.Versions) error {

cxn.resps = make(chan promisedResp, 10)
go cxn.handleResps()
cxn.l.Log(LogLevelDebug, "connection initialized successfully")
return nil
}

Expand Down Expand Up @@ -499,15 +500,13 @@ func (cxn *brokerCxn) sasl() error {
mechanism := cxn.sasls[0]
retried := false
authenticate := false
const handshakeKey = 17

req := new(kmsg.SASLHandshakeRequest)
start:
if mechanism.Name() != "GSSAPI" && cxn.versions[handshakeKey] >= 0 {
req := &kmsg.SASLHandshakeRequest{
Version: cxn.versions[handshakeKey],
Mechanism: mechanism.Name(),
}
cxn.l.Log(LogLevelDebug, "writing SASLHandshakeRequest")
if mechanism.Name() != "GSSAPI" && cxn.versions[req.Key()] >= 0 {
req.Mechanism = mechanism.Name()
req.Version = cxn.versions[req.Key()]
cxn.l.Log(LogLevelDebug, "issuing SASLHandshakeRequest")
corrID, err := cxn.writeRequest(req)
if err != nil {
return err
Expand Down Expand Up @@ -563,12 +562,11 @@ func (cxn *brokerCxn) doSasl(authenticate bool) error {
// We continue writing until both the challenging is done AND the
// responses are done. We can have an additional response once we
// are done with challenges.
step := 0
step := -1
for done := false; !done || len(clientWrite) > 0; {
step++
var challenge []byte

cxn.l.Log(LogLevelDebug, "issuing authentication step", "sasl_authenticate_request_envelope", authenticate, "step", step)
step++
if !authenticate {
buf := cxn.bufPool.get()

Expand All @@ -579,6 +577,7 @@ func (cxn *brokerCxn) doSasl(authenticate bool) error {
if wt > 0 {
cxn.conn.SetWriteDeadline(time.Now().Add(wt))
}
cxn.l.Log(LogLevelDebug, "issuing raw sasl authenticate", "step", step)
_, err = cxn.conn.Write(buf)
if wt > 0 {
cxn.conn.SetWriteDeadline(time.Time{})
Expand All @@ -596,11 +595,12 @@ func (cxn *brokerCxn) doSasl(authenticate bool) error {
}

} else {
const authenticateKey = 37
req := &kmsg.SASLAuthenticateRequest{
Version: cxn.versions[authenticateKey],
SASLAuthBytes: clientWrite,
}
req.Version = cxn.versions[req.Key()]
cxn.l.Log(LogLevelDebug, "issuing SASLAuthenticate", "version", req.Version, "step", step)

corrID, err := cxn.writeRequest(req)
if err != nil {
return err
Expand Down
9 changes: 3 additions & 6 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,9 +283,8 @@ func (cl *Client) fetchMetadata(ctx context.Context, all bool, topics []string)
topics = []string{}
}
tries := 0
const key = 3 // metadata request key
tryStart := time.Now()
retryTimeout := cl.cfg.retryTimeout(key)
retryTimeout := cl.cfg.retryTimeout(3) // 3 is metadata request key
start:
tries++
broker := cl.broker()
Expand Down Expand Up @@ -469,8 +468,7 @@ func (cl *Client) request(ctx context.Context, req kmsg.Request) (kmsg.Response,
var err error
tries := 0
tryStart := time.Now()
key := req.Key()
retryTimeout := cl.cfg.retryTimeout(key)
retryTimeout := cl.cfg.retryTimeout(req.Key())
start:
tries++
if metaReq, isMetaReq := req.(*kmsg.MetadataRequest); isMetaReq {
Expand Down Expand Up @@ -571,10 +569,9 @@ func (cl *Client) loadCoordinator(ctx context.Context, key coordinatorKey) (*bro
}
}

const reqKey = 10
tries := 0
tryStart := time.Now()
retryTimeout := cl.cfg.retryTimeout(reqKey)
retryTimeout := cl.cfg.retryTimeout(10) // 10 is find coordinator key
start:
cl.coordinatorsMu.Lock()
coordinator, ok := cl.coordinators[key]
Expand Down
2 changes: 1 addition & 1 deletion pkg/kversion/kversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ func Tip() Versions {
v[25]++ // 2 add offsets to txn
v[26]++ // 2 end txn

v[18]++ // KAFKA-10027 4f96c5b424956355339dd3216c426c1c0388fe9e KIP-584
v[18]++ // 4 api versions KAFKA-10027 4f96c5b424956355339dd3216c426c1c0388fe9e KIP-584

v = append(v,
0, // 50 describe user scram creds, KAFKA-10259 e8524ccd8fca0caac79b844d87e98e9c055f76fb KIP-554
Expand Down

0 comments on commit 5fb0a38

Please sign in to comment.