From f591593c733e05b5d6fd540d959f09c61cfea02c Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Thu, 12 Aug 2021 01:00:10 -0600 Subject: [PATCH] broker: permanently store the initial ApiVersions response Per-broker, there is no reason to re-issue an ApiVersions request every time a connection is initialized. Instead, we can issue it once on the first connection (or concurrently, if two connection loads see it uninitialized) and store the result. After the result is stored, we will use our stored version forever. This speeds up reconnects by avoiding one round trip, and allows us to probe brokers to see if they actually support something (see incoming commits). ApiVersions is the first thing we do when connecting, so if we load a connection successfully, we know that we have loaded versions. --- pkg/kgo/broker.go | 80 +++++++++++++++++++++++++++++++++++------------ 1 file changed, 60 insertions(+), 20 deletions(-) diff --git a/pkg/kgo/broker.go b/pkg/kgo/broker.go index 838d1696..899f093b 100644 --- a/pkg/kgo/broker.go +++ b/pkg/kgo/broker.go @@ -109,6 +109,12 @@ type broker struct { addr string // net.JoinHostPort(meta.Host, meta.Port) meta BrokerMetadata + // versions tracks the first load of an ApiVersions. We store this + // after the first connect, which helps speed things up on future + // reconnects (across any of the three broker connections) because we + // will never look up API versions for this broker again. + versions atomic.Value // *brokerVersions + // The cxn fields each manage a single tcp connection to one broker. // Each field is managed serially in handleReqs. This means that only // one write can happen at a time, regardless of which connection the @@ -132,6 +138,33 @@ type broker struct { dead int32 } +// brokerVersions is loaded once (and potentially a few times concurrently if +// multiple connections are opening at once) and then forever stored for a +// broker. +type brokerVersions struct { + versions [kmsg.MaxKey + 1]int16 +} + +func newBrokerVersions() *brokerVersions { + var v brokerVersions + for i := range &v.versions { + v.versions[i] = -1 + } + return &v +} + +func (v *brokerVersions) len() int { return kmsg.MaxKey + 1 } + +func (b *broker) loadVersions() *brokerVersions { + loaded := b.versions.Load() + if loaded == nil { + return nil + } + return loaded.(*brokerVersions) +} + +func (b *broker) storeVersions(v *brokerVersions) { b.versions.Store(v) } + const unknownControllerID = -1 var unknownBrokerMetadata = BrokerMetadata{ @@ -159,6 +192,7 @@ func (cl *Client) newBroker(nodeID int32, host string, port int32, rack *string) reqs: make(chan promisedReq, 10), } + go br.handleReqs() return br @@ -248,16 +282,17 @@ func (b *broker) handleReqs() { } } - if int(req.Key()) > len(cxn.versions[:]) || - b.cl.cfg.maxVersions != nil && !b.cl.cfg.maxVersions.HasKey(req.Key()) { + v := b.loadVersions() + + if int(req.Key()) > v.len() || b.cl.cfg.maxVersions != nil && !b.cl.cfg.maxVersions.HasKey(req.Key()) { pr.promise(nil, errUnknownRequestKey) continue } - // If cxn.versions[0] is non-negative, then we loaded API + // If v.versions[0] is non-negative, then we loaded API // versions. If the version for this request is negative, we // know the broker cannot handle this request. - if cxn.versions[0] >= 0 && cxn.versions[req.Key()] < 0 { + if v.versions[0] >= 0 && v.versions[req.Key()] < 0 { pr.promise(nil, errBrokerTooOld) continue } @@ -274,7 +309,7 @@ func (b *broker) handleReqs() { // versions because the client is pinned pre 0.10.0 and we // stick with our max. version := ourMax - if brokerMax := cxn.versions[req.Key()]; brokerMax >= 0 && brokerMax < ourMax { + if brokerMax := v.versions[req.Key()]; brokerMax >= 0 && brokerMax < ourMax { version = brokerMax } @@ -546,8 +581,7 @@ type brokerCxn struct { cl *Client b *broker - addr string - versions [kmsg.MaxKey + 1]int16 + addr string mechanism sasl.Mechanism expiry time.Time @@ -575,14 +609,17 @@ type brokerCxn struct { } func (cxn *brokerCxn) init(isProduceCxn bool) error { - for i := 0; i < len(cxn.versions[:]); i++ { - cxn.versions[i] = -1 - } - - if cxn.b.cl.cfg.maxVersions == nil || cxn.b.cl.cfg.maxVersions.HasKey(18) { - if err := cxn.requestAPIVersions(); err != nil { - cxn.cl.cfg.logger.Log(LogLevelError, "unable to request api versions", "broker", logID(cxn.b.meta.NodeID), "err", err) - return err + hasVersions := cxn.b.loadVersions() != nil + if !hasVersions { + if cxn.b.cl.cfg.maxVersions == nil || cxn.b.cl.cfg.maxVersions.HasKey(18) { + if err := cxn.requestAPIVersions(); err != nil { + cxn.cl.cfg.logger.Log(LogLevelError, "unable to request api versions", "broker", logID(cxn.b.meta.NodeID), "err", err) + return err + } + } else { + // We have a max versions, and it indicates no support + // for ApiVersions. We just store a default -1 set. + cxn.b.storeVersions(newBrokerVersions()) } } @@ -667,12 +704,14 @@ start: return errors.New("ApiVersions response invalidly contained no ApiKeys") } + v := newBrokerVersions() for _, key := range resp.ApiKeys { - if key.ApiKey > kmsg.MaxKey { + if key.ApiKey > kmsg.MaxKey || key.ApiKey < 0 { continue } - cxn.versions[key.ApiKey] = key.MaxVersion + v.versions[key.ApiKey] = key.MaxVersion } + cxn.b.storeVersions(v) return nil } @@ -684,11 +723,12 @@ func (cxn *brokerCxn) sasl() error { retried := false authenticate := false + v := cxn.b.loadVersions() req := new(kmsg.SASLHandshakeRequest) start: - if mechanism.Name() != "GSSAPI" && cxn.versions[req.Key()] >= 0 { + if mechanism.Name() != "GSSAPI" && v.versions[req.Key()] >= 0 { req.Mechanism = mechanism.Name() - req.Version = cxn.versions[req.Key()] + req.Version = v.versions[req.Key()] cxn.cl.cfg.logger.Log(LogLevelDebug, "issuing SASLHandshakeRequest", "broker", logID(cxn.b.meta.NodeID)) corrID, bytesWritten, writeErr, writeWait, timeToWrite, readEnqueue := cxn.writeRequest(nil, time.Now(), req) if writeErr != nil { @@ -776,7 +816,7 @@ func (cxn *brokerCxn) doSasl(authenticate bool) error { req := &kmsg.SASLAuthenticateRequest{ SASLAuthBytes: clientWrite, } - req.Version = cxn.versions[req.Key()] + req.Version = cxn.b.loadVersions().versions[req.Key()] cxn.cl.cfg.logger.Log(LogLevelDebug, "issuing SASLAuthenticate", "broker", logID(cxn.b.meta.NodeID), "version", req.Version, "step", step) corrID, bytesWritten, writeErr, writeWait, timeToWrite, readEnqueue := cxn.writeRequest(nil, time.Now(), req)