From 9dc80fd2978e8fb792f669ec6595cc5dadfc9fa5 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Sun, 11 Oct 2020 21:44:57 -0600 Subject: [PATCH] kgo: add hooks See the Hook documentation. Primarily, hooks can be used to layer in metrics. I expect more hooks to be added later. The benefit of hooks is that this library stays agnostic as to which metrics library is used. --- pkg/kgo/broker.go | 226 +++++++++++++++++++++++++++++--------------- pkg/kgo/client.go | 40 ++++---- pkg/kgo/config.go | 13 +++ pkg/kgo/consumer.go | 2 +- pkg/kgo/hooks.go | 72 ++++++++++++++ pkg/kgo/source.go | 2 +- 6 files changed, 255 insertions(+), 100 deletions(-) create mode 100644 pkg/kgo/hooks.go diff --git a/pkg/kgo/broker.go b/pkg/kgo/broker.go index cde83596..1ede2fc4 100644 --- a/pkg/kgo/broker.go +++ b/pkg/kgo/broker.go @@ -7,6 +7,7 @@ import ( "io" "math" "net" + "strconv" "sync" "sync/atomic" "time" @@ -22,6 +23,7 @@ type promisedReq struct { ctx context.Context req kmsg.Request promise func(kmsg.Response, error) + enqueue time.Time // used to calculate writeWait } type promisedResp struct { @@ -51,6 +53,8 @@ type promisedResp struct { resp kmsg.Response promise func(kmsg.Response, error) + + enqueue time.Time // used to calculate readWait } type waitingResp struct { @@ -59,13 +63,45 @@ type waitingResp struct { err error } +// BrokerMetadata is metadata for a broker. +// +// This struct mirrors kmsg.MetadataResponseBroker. +type BrokerMetadata struct { + // NodeID is the broker node ID. + // + // Seed brokers will have very negative IDs; kgo does not try to map + // seed brokers to loaded brokers. + NodeID int32 + + // Port is the port of the broker. + Port int32 + + // Host is the hostname of the broker. + Host string + + // Rack is an optional rack of the broker. It is invalid to modify this + // field. + // + // Seed brokers will not have a rack. + Rack *string + + _internal struct{} // allow us to add fields later +} + +func (this BrokerMetadata) equals(other kmsg.MetadataResponseBroker) bool { + return this.NodeID == other.NodeID && + this.Port == other.Port && + this.Host == other.Host && + (this.Rack == nil && other.Rack == nil || + this.Rack != nil && other.Rack != nil && *this.Rack == *other.Rack) +} + // broker manages the concept how a client would interact with a broker. type broker struct { cl *Client - // id and addr are the Kafka broker ID and addr for this broker. - id int32 - addr string + addr string // net.JoinHostPort(meta.Host, meta.Port) + meta BrokerMetadata // The cxn fields each manage a single tcp connection to one broker. // Each field is managed serially in handleReqs. This means that only @@ -112,12 +148,17 @@ func unknownSeedID(seedNum int) int32 { return int32(math.MinInt32 + seedNum) } -func (cl *Client) newBroker(addr string, id int32) *broker { +func (cl *Client) newBroker(nodeID int32, host string, port int32, rack *string) *broker { br := &broker{ cl: cl, - id: id, - addr: addr, + addr: net.JoinHostPort(host, strconv.Itoa(int(port))), + meta: BrokerMetadata{ + NodeID: nodeID, + Host: host, + Port: port, + Rack: rack, + }, reqs: make(chan promisedReq, 10), } @@ -160,11 +201,12 @@ func (b *broker) do( ) { dead := false + enqueue := time.Now() b.dieMu.RLock() if atomic.LoadInt32(&b.dead) == 1 { dead = true } else { - b.reqs <- promisedReq{ctx, req, promise} + b.reqs <- promisedReq{ctx, req, promise, enqueue} } b.dieMu.RUnlock() @@ -291,20 +333,23 @@ func (b *broker) handleReqs() { default: } - corrID, err := cxn.writeRequest(req) + corrID, err := cxn.writeRequest(time.Since(pr.enqueue), req) + if err != nil { pr.promise(nil, err) cxn.die() continue } - rt, _ := cxn.timeouts(req) + rt, _ := cxn.cl.connTimeoutFn(req) + cxn.waitResp(promisedResp{ corrID, rt, req.IsFlexible() && req.Key() != 18, // response header not flexible if ApiVersions; see promisedResp doc req.ResponseKind(), pr.promise, + time.Now(), }) } } @@ -341,23 +386,18 @@ func (b *broker) loadConnection(ctx context.Context, reqKey int16) (*brokerCxn, } cxn := &brokerCxn{ - bufPool: b.cl.bufPool, - addr: b.addr, - conn: conn, - timeouts: b.cl.connTimeoutFn, - l: b.cl.cfg.logger, - reqFormatter: b.cl.reqFormatter, - softwareName: b.cl.cfg.softwareName, - softwareVersion: b.cl.cfg.softwareVersion, - saslCtx: b.cl.ctx, - sasls: b.cl.cfg.sasls, + cl: b.cl, + b: b, + + addr: b.addr, + conn: conn, } if err = cxn.init(b.cl.cfg.maxVersions); err != nil { - b.cl.cfg.logger.Log(LogLevelDebug, "connection initialization failed", "addr", b.addr, "id", b.id, "err", err) - conn.Close() + b.cl.cfg.logger.Log(LogLevelDebug, "connection initialization failed", "addr", b.addr, "id", b.meta.NodeID, "err", err) + cxn.closeConn() return nil, err } - b.cl.cfg.logger.Log(LogLevelDebug, "connection initialized successfully", "addr", b.addr, "id", b.id) + b.cl.cfg.logger.Log(LogLevelDebug, "connection initialized successfully", "addr", b.addr, "id", b.meta.NodeID) *pcxn = cxn return cxn, nil @@ -365,16 +405,23 @@ func (b *broker) loadConnection(ctx context.Context, reqKey int16) (*brokerCxn, // connect connects to the broker's addr, returning the new connection. func (b *broker) connect(ctx context.Context) (net.Conn, error) { - b.cl.cfg.logger.Log(LogLevelDebug, "opening connection to broker", "addr", b.addr, "id", b.id) + b.cl.cfg.logger.Log(LogLevelDebug, "opening connection to broker", "addr", b.addr, "id", b.meta.NodeID) + start := time.Now() conn, err := b.cl.cfg.dialFn(ctx, "tcp", b.addr) + since := time.Since(start) + b.cl.cfg.hooks.each(func(h Hook) { + if h, ok := h.(BrokerConnectHook); ok { + h.OnConnect(b.meta, since, conn, err) + } + }) if err != nil { - b.cl.cfg.logger.Log(LogLevelWarn, "unable to open connection to broker", "addr", b.addr, "id", b.id, "err", err) + b.cl.cfg.logger.Log(LogLevelWarn, "unable to open connection to broker", "addr", b.addr, "id", b.meta.NodeID, "err", err) if _, ok := err.(net.Error); ok { return nil, ErrNoDial } return nil, err } else { - b.cl.cfg.logger.Log(LogLevelDebug, "connection opened to broker", "addr", b.addr, "id", b.id) + b.cl.cfg.logger.Log(LogLevelDebug, "connection opened to broker", "addr", b.addr, "id", b.meta.NodeID) } return conn, nil } @@ -382,27 +429,18 @@ func (b *broker) connect(ctx context.Context) (net.Conn, error) { // brokerCxn manages an actual connection to a Kafka broker. This is separate // the broker struct to allow lazy connection (re)creation. type brokerCxn struct { - conn net.Conn - addr string - versions [kmsg.MaxKey + 1]int16 - - timeouts func(kmsg.Request) (time.Duration, time.Duration) + conn net.Conn - saslCtx context.Context - sasls []sasl.Mechanism + cl *Client + b *broker - l Logger + addr string + versions [kmsg.MaxKey + 1]int16 mechanism sasl.Mechanism expiry time.Time - // bufPool, corrID, and reqFormatter are used in writing requests. - bufPool bufPool - corrID int32 - reqFormatter *kmsg.RequestFormatter - - softwareName string // for KIP-511 - softwareVersion string // for KIP-511 + corrID int32 // dieMu guards sending to resps in case the connection has died. dieMu sync.RWMutex @@ -419,13 +457,13 @@ func (cxn *brokerCxn) init(maxVersions kversion.Versions) error { if maxVersions == nil || len(maxVersions) >= 19 { if err := cxn.requestAPIVersions(); err != nil { - cxn.l.Log(LogLevelError, "unable to request api versions", "err", err) + cxn.cl.cfg.logger.Log(LogLevelError, "unable to request api versions", "err", err) return err } } if err := cxn.sasl(); err != nil { - cxn.l.Log(LogLevelError, "unable to initialize sasl", "err", err) + cxn.cl.cfg.logger.Log(LogLevelError, "unable to initialize sasl", "err", err) return err } @@ -439,17 +477,17 @@ func (cxn *brokerCxn) requestAPIVersions() error { start: req := &kmsg.ApiVersionsRequest{ Version: maxVersion, - ClientSoftwareName: cxn.softwareName, - ClientSoftwareVersion: cxn.softwareVersion, + ClientSoftwareName: cxn.cl.cfg.softwareName, + ClientSoftwareVersion: cxn.cl.cfg.softwareVersion, } - cxn.l.Log(LogLevelDebug, "issuing api versions request", "version", maxVersion) - corrID, err := cxn.writeRequest(req) + cxn.cl.cfg.logger.Log(LogLevelDebug, "issuing api versions request", "version", maxVersion) + corrID, err := cxn.writeRequest(0, req) if err != nil { return err } - rt, _ := cxn.timeouts(req) - rawResp, err := readResponse(cxn.conn, corrID, rt, false) // api versions does *not* use flexible response headers; see comment in promisedResp + rt, _ := cxn.cl.connTimeoutFn(req) + rawResp, err := cxn.readResponse(0, req.Key(), corrID, rt, false) // api versions does *not* use flexible response headers; see comment in promisedResp if err != nil { return err } @@ -469,7 +507,7 @@ start: return ErrConnDead } if string(rawResp) == "\x00\x23\x00\x00\x00\x00" { - cxn.l.Log(LogLevelDebug, "kafka does not know our ApiVersions version, downgrading to version 0 and retrying") + cxn.cl.cfg.logger.Log(LogLevelDebug, "kafka does not know our ApiVersions version, downgrading to version 0 and retrying") maxVersion = 0 goto start } @@ -489,15 +527,15 @@ start: } cxn.versions[key.ApiKey] = key.MaxVersion } - cxn.l.Log(LogLevelDebug, "initialized api versions", "versions", cxn.versions) + cxn.cl.cfg.logger.Log(LogLevelDebug, "initialized api versions", "versions", cxn.versions) return nil } func (cxn *brokerCxn) sasl() error { - if len(cxn.sasls) == 0 { + if len(cxn.cl.cfg.sasls) == 0 { return nil } - mechanism := cxn.sasls[0] + mechanism := cxn.cl.cfg.sasls[0] retried := false authenticate := false @@ -506,14 +544,14 @@ start: if mechanism.Name() != "GSSAPI" && cxn.versions[req.Key()] >= 0 { req.Mechanism = mechanism.Name() req.Version = cxn.versions[req.Key()] - cxn.l.Log(LogLevelDebug, "issuing SASLHandshakeRequest") - corrID, err := cxn.writeRequest(req) + cxn.cl.cfg.logger.Log(LogLevelDebug, "issuing SASLHandshakeRequest") + corrID, err := cxn.writeRequest(0, req) if err != nil { return err } - rt, _ := cxn.timeouts(req) - rawResp, err := readResponse(cxn.conn, corrID, rt, req.IsFlexible()) + rt, _ := cxn.cl.connTimeoutFn(req) + rawResp, err := cxn.readResponse(0, req.Key(), corrID, rt, req.IsFlexible()) if err != nil { return err } @@ -525,7 +563,7 @@ start: err = kerr.ErrorForCode(resp.ErrorCode) if err != nil { if !retried && err == kerr.UnsupportedSaslMechanism { - for _, ours := range cxn.sasls[1:] { + for _, ours := range cxn.cl.cfg.sasls[1:] { for _, supported := range resp.SupportedMechanisms { if supported == ours.Name() { mechanism = ours @@ -539,13 +577,13 @@ start: } authenticate = req.Version == 1 } - cxn.l.Log(LogLevelDebug, "beginning sasl authentication", "mechanism", mechanism.Name()) + cxn.cl.cfg.logger.Log(LogLevelDebug, "beginning sasl authentication", "mechanism", mechanism.Name()) cxn.mechanism = mechanism return cxn.doSasl(authenticate) } func (cxn *brokerCxn) doSasl(authenticate bool) error { - session, clientWrite, err := cxn.mechanism.Authenticate(cxn.saslCtx, cxn.addr) + session, clientWrite, err := cxn.mechanism.Authenticate(cxn.cl.ctx, cxn.addr) if err != nil { return err } @@ -557,7 +595,7 @@ func (cxn *brokerCxn) doSasl(authenticate bool) error { // Even if we do not wrap our reads/writes in SASLAuthenticate, we // still use the SASLAuthenticate timeouts. - rt, wt := cxn.timeouts(new(kmsg.SASLAuthenticateRequest)) + rt, wt := cxn.cl.connTimeoutFn(new(kmsg.SASLAuthenticateRequest)) // We continue writing until both the challenging is done AND the // responses are done. We can have an additional response once we @@ -568,7 +606,7 @@ func (cxn *brokerCxn) doSasl(authenticate bool) error { var challenge []byte if !authenticate { - buf := cxn.bufPool.get() + buf := cxn.cl.bufPool.get() buf = append(buf[:0], 0, 0, 0, 0) binary.BigEndian.PutUint32(buf, uint32(len(clientWrite))) @@ -577,13 +615,13 @@ func (cxn *brokerCxn) doSasl(authenticate bool) error { if wt > 0 { cxn.conn.SetWriteDeadline(time.Now().Add(wt)) } - cxn.l.Log(LogLevelDebug, "issuing raw sasl authenticate", "step", step) + cxn.cl.cfg.logger.Log(LogLevelDebug, "issuing raw sasl authenticate", "step", step) _, err = cxn.conn.Write(buf) if wt > 0 { cxn.conn.SetWriteDeadline(time.Time{}) } - cxn.bufPool.put(buf) + cxn.cl.bufPool.put(buf) if err != nil { return ErrConnDead @@ -599,14 +637,14 @@ func (cxn *brokerCxn) doSasl(authenticate bool) error { SASLAuthBytes: clientWrite, } req.Version = cxn.versions[req.Key()] - cxn.l.Log(LogLevelDebug, "issuing SASLAuthenticate", "version", req.Version, "step", step) + cxn.cl.cfg.logger.Log(LogLevelDebug, "issuing SASLAuthenticate", "version", req.Version, "step", step) - corrID, err := cxn.writeRequest(req) + corrID, err := cxn.writeRequest(0, req) if err != nil { return err } if !done { - rawResp, err := readResponse(cxn.conn, corrID, rt, req.IsFlexible()) + rawResp, err := cxn.readResponse(0, req.Key(), corrID, rt, req.IsFlexible()) if err != nil { return err } @@ -644,27 +682,38 @@ func (cxn *brokerCxn) doSasl(authenticate bool) error { return fmt.Errorf("invalid short sasl lifetime millis %d", lifetimeMillis) } cxn.expiry = time.Now().Add(time.Duration(lifetimeMillis)*time.Millisecond - time.Second) - cxn.l.Log(LogLevelDebug, "connection has a limited lifetime", "reauthenticate_at", cxn.expiry) + cxn.cl.cfg.logger.Log(LogLevelDebug, "connection has a limited lifetime", "reauthenticate_at", cxn.expiry) } return nil } // writeRequest writes a message request to the broker connection, bumping the // connection's correlation ID as appropriate for the next write. -func (cxn *brokerCxn) writeRequest(req kmsg.Request) (int32, error) { - buf := cxn.bufPool.get() - defer cxn.bufPool.put(buf) - buf = cxn.reqFormatter.AppendRequest( +func (cxn *brokerCxn) writeRequest(writeWait time.Duration, req kmsg.Request) (int32, error) { + buf := cxn.cl.bufPool.get() + defer cxn.cl.bufPool.put(buf) + buf = cxn.cl.reqFormatter.AppendRequest( buf[:0], req, cxn.corrID, ) - _, wt := cxn.timeouts(req) + _, wt := cxn.cl.connTimeoutFn(req) if wt > 0 { cxn.conn.SetWriteDeadline(time.Now().Add(wt)) defer cxn.conn.SetWriteDeadline(time.Time{}) } - if _, err := cxn.conn.Write(buf); err != nil { + + writeStart := time.Now() + _, err := cxn.conn.Write(buf) + timeToWrite := time.Since(writeStart) + + cxn.cl.cfg.hooks.each(func(h Hook) { + if h, ok := h.(BrokerWriteHook); ok { + h.OnWrite(cxn.b.meta, req.Key(), len(buf), writeWait, timeToWrite, err) + } + }) + + if err != nil { return 0, ErrConnDead } id := cxn.corrID @@ -695,8 +744,18 @@ func readConn(conn net.Conn, timeout time.Duration) ([]byte, error) { // readResponse reads a response from conn, ensures the correlation ID is // correct, and returns a newly allocated slice on success. -func readResponse(conn net.Conn, corrID int32, timeout time.Duration, flexibleHeader bool) ([]byte, error) { - buf, err := readConn(conn, timeout) +func (cxn *brokerCxn) readResponse(readWait time.Duration, key int16, corrID int32, timeout time.Duration, flexibleHeader bool) ([]byte, error) { + readStart := time.Now() + buf, err := readConn(cxn.conn, timeout) + timeToRead := time.Since(readStart) + + cxn.cl.cfg.hooks.each(func(h Hook) { + if h, ok := h.(BrokerReadHook); ok { + // readConn reads four size bytes in addition to the buf. + h.OnRead(cxn.b.meta, key, 4+len(buf), readWait, timeToRead, err) + } + }) + if err != nil { return nil, err } @@ -705,7 +764,6 @@ func readResponse(conn net.Conn, corrID int32, timeout time.Duration, flexibleHe } gotID := int32(binary.BigEndian.Uint32(buf)) if gotID != corrID { - conn.Close() return nil, ErrCorrelationIDMismatch } // If the response header is flexible, we skip the tags at the end of @@ -718,6 +776,18 @@ func readResponse(conn net.Conn, corrID int32, timeout time.Duration, flexibleHe return buf[4:], nil } +// closeConn is the one place we close broker connections. This is always done +// in either die, which is called when handleResps returns, or if init fails, +// which means we did not succeed enough to start handleResps. +func (cxn *brokerCxn) closeConn() { + cxn.cl.cfg.hooks.each(func(h Hook) { + if h, ok := h.(BrokerDisconnectHook); ok { + h.OnDisconnect(cxn.b.meta, cxn.conn) + } + }) + cxn.conn.Close() +} + // die kills a broker connection (which could be dead already) and replies to // all requests awaiting responses appropriately. func (cxn *brokerCxn) die() { @@ -728,7 +798,7 @@ func (cxn *brokerCxn) die() { return } - cxn.conn.Close() + cxn.closeConn() go func() { for pr := range cxn.resps { @@ -765,7 +835,7 @@ func (cxn *brokerCxn) handleResps() { defer cxn.die() // always track our death for pr := range cxn.resps { - raw, err := readResponse(cxn.conn, pr.corrID, pr.readTimeout, pr.flexibleHeader) + raw, err := cxn.readResponse(time.Since(pr.enqueue), pr.resp.Key(), pr.corrID, pr.readTimeout, pr.flexibleHeader) if err != nil { pr.promise(nil, err) return diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go index 1bb99021..44871569 100644 --- a/pkg/kgo/client.go +++ b/pkg/kgo/client.go @@ -21,7 +21,6 @@ import ( "context" "fmt" "math/rand" - "net" "strconv" "strings" "sync" @@ -94,24 +93,28 @@ func NewClient(opts ...Opt) (*Client, error) { return nil, err } - seedAddrs := make([]string, 0, len(cfg.seedBrokers)) + type hostport struct { + host string + port int32 + } + seeds := make([]hostport, 0, len(cfg.seedBrokers)) for _, seedBroker := range cfg.seedBrokers { addr := seedBroker - port := 9092 // default kafka port - var err error + port := int32(9092) // default kafka port if colon := strings.IndexByte(addr, ':'); colon > 0 { - port, err = strconv.Atoi(addr[colon+1:]) + port64, err := strconv.ParseInt(addr[colon+1:], 10, 64) if err != nil { return nil, fmt.Errorf("unable to parse addr:port in %q", seedBroker) } addr = addr[:colon] + port = int32(port64) } if addr == "localhost" { addr = "127.0.0.1" } - seedAddrs = append(seedAddrs, net.JoinHostPort(addr, strconv.Itoa(port))) + seeds = append(seeds, hostport{addr, port}) } ctx, cancel := context.WithCancel(context.Background()) @@ -155,9 +158,9 @@ func NewClient(opts ...Opt) (*Client, error) { } cl.compressor = compressor - for i, seedAddr := range seedAddrs { - b := cl.newBroker(seedAddr, unknownSeedID(i)) - cl.brokers[b.id] = b + for i, seed := range seeds { + b := cl.newBroker(unknownSeedID(i), seed.host, seed.port, nil) + cl.brokers[b.meta.NodeID] = b cl.anyBroker = append(cl.anyBroker, b) } go cl.updateMetadataLoop() @@ -331,20 +334,17 @@ func (cl *Client) updateBrokers(brokers []kmsg.MetadataResponseBroker) { } for _, broker := range brokers { - addr := net.JoinHostPort(broker.Host, strconv.Itoa(int(broker.Port))) - b, exists := cl.brokers[broker.NodeID] if exists { - delete(cl.brokers, b.id) - if b.addr != addr { + if !b.meta.equals(broker) { b.stopForever() - b = cl.newBroker(addr, b.id) + b = cl.newBroker(broker.NodeID, broker.Host, broker.Port, broker.Rack) } } else { - b = cl.newBroker(addr, broker.NodeID) + b = cl.newBroker(broker.NodeID, broker.Host, broker.Port, broker.Rack) } - newBrokers[b.id] = b + newBrokers[broker.NodeID] = b newAnyBroker = append(newAnyBroker, b) } @@ -953,8 +953,8 @@ func (cl *Client) DiscoveredBrokers() []*Broker { var bs []*Broker for _, broker := range cl.brokers { - if broker.id >= 0 { - bs = append(bs, &Broker{id: broker.id, cl: cl}) + if broker.meta.NodeID >= 0 { + bs = append(bs, &Broker{id: broker.meta.NodeID, cl: cl}) } } return bs @@ -992,7 +992,7 @@ func (cl *Client) handleListGroupsReq(ctx context.Context, req *kmsg.ListGroupsR respErrs := make(chan respErr, len(cl.brokers)) var numReqs int for _, br := range cl.brokers { - if br.id < 0 { + if br.meta.NodeID < 0 { continue // we skip seed brokers } wg.Add(1) @@ -1483,7 +1483,7 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) (kmsg. // We need to fan that out. if t.Topics == nil { for _, broker := range brokers { - if broker.id < 0 { // do not use seed brokers + if broker.meta.NodeID < 0 { // do not use seed brokers continue } broker2req[broker] = new(kmsg.DescribeLogDirsRequest) diff --git a/pkg/kgo/config.go b/pkg/kgo/config.go index 7b49f6fd..9f1b93cb 100644 --- a/pkg/kgo/config.go +++ b/pkg/kgo/config.go @@ -71,6 +71,8 @@ type cfg struct { sasls []sasl.Mechanism + hooks hooks + // ***PRODUCER SECTION*** txnID *string txnTimeout time.Duration @@ -400,6 +402,17 @@ func SASL(sasls ...sasl.Mechanism) Opt { return clientOpt{func(cfg *cfg) { cfg.sasls = append(cfg.sasls, sasls...) }} } +// WithHooks sets hooks to call whenever relevant. +// +// Hooks can be used to layer in metrics (such as prometheus hooks) or anything +// else. The client will call all hooks in order. See the Hooks interface for +// more information, as well as any interface that contains "Hook" in the name +// to know the available hooks. A single hook can implement zero or all hook +// interfaces, and only the hooks that it implements will be called. +func WithHooks(hooks ...Hook) Opt { + return clientOpt{func(cfg *cfg) { cfg.hooks = append(cfg.hooks, hooks...) }} +} + // ********** PRODUCER CONFIGURATION ********** // Acks represents the number of acks a broker leader must have before diff --git a/pkg/kgo/consumer.go b/pkg/kgo/consumer.go index 8586f851..350f291f 100644 --- a/pkg/kgo/consumer.go +++ b/pkg/kgo/consumer.go @@ -728,7 +728,7 @@ func (c *consumer) tryOffsetLoad(toLoad offsetsLoad) { c.cl.brokersMu.RUnlock() for broker, brokerLoad := range brokersToLoadFrom { - c.cl.cfg.logger.Log(LogLevelDebug, "offsets to load broker", "broker", broker.id, "load", brokerLoad) + c.cl.cfg.logger.Log(LogLevelDebug, "offsets to load broker", "broker", broker.meta.NodeID, "load", brokerLoad) if len(brokerLoad.list) > 0 { go c.tryBrokerOffsetLoadList(broker, brokerLoad.list) } diff --git a/pkg/kgo/hooks.go b/pkg/kgo/hooks.go new file mode 100644 index 00000000..72ccd0a8 --- /dev/null +++ b/pkg/kgo/hooks.go @@ -0,0 +1,72 @@ +package kgo + +import ( + "net" + "time" +) + +// Hook is a hook to be called when something happens in kgo. +// +// The base Hook interface is useless, but wherever a hook can occur in kgo, +// the client checks if your hook implements an appropriate interface. If so, +// your hook is called. +// +// This allows you to only hook in to behavior you care about, and it allows +// the client to add more hooks in the future. +// +// All hook interfaces in this package have Hook in the name. Hooks must be +// safe for concurrent use. It is expected that hooks are fast; if a hook needs +// to take time, then copy what you need and ensure the hook is async. +type Hook interface{} + +type hooks []Hook + +func (hs hooks) each(fn func(Hook)) { + for _, h := range hs { + fn(h) + } +} + +// BrokerConnectHook is called after a connection to a broker is opened. +type BrokerConnectHook interface { + // OnConnect is passed the broker metadata, how long it took to dial, + // and either the dial's resulting net.Conn or error. + OnConnect(meta BrokerMetadata, dialDur time.Duration, conn net.Conn, err error) +} + +// BrokerDisconnectHook is called when a connection to a broker is closed. +type BrokerDisconnectHook interface { + // OnDisconnect is passed the broker metadata and the connection that + // is closing. + OnDisconnect(meta BrokerMetadata, conn net.Conn) +} + +// BrokerWriteHook is called after a write to a broker. +// +// Kerberos SASL does not cause write hooks, since it directly writes to the +// connection. This may change in the future such that the sasl authenticate +// key is used (even though sasl authenticate requests are not being issued). +type BrokerWriteHook interface { + // OnWrite is passed the broker metadata, the key for the request that + // was written, the number of bytes written, how long the request + // waited before being written, how long it took to write the request, + // and any error. + // + // The bytes written does not count any tls overhead. + OnWrite(meta BrokerMetadata, key int16, bytesWritten int, writeWait, timeToWrite time.Duration, err error) +} + +// BrokerReadHook is called after a read from a broker. +// +// Kerberos SASL does not cause write hooks, since it directly writes to the +// connection. This may change in the future such that the sasl authenticate +// key is used (even though sasl authenticate requests are not being issued). +type BrokerReadHook interface { + // OnRead is passed the broker metadata, the key for the response that + // was read, the number of bytes read, how long the client waited + // before reading the respeonse, how long it took to read the response, + // and any error. + // + // The bytes written does not count any tls overhead. + OnRead(meta BrokerMetadata, key int16, bytesRead int, readWait, timeToRead time.Duration, err error) +} diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index 7c79dbb4..d04c5919 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -568,7 +568,7 @@ func (s *source) handleReqResp(req *fetchRequest, kresp kmsg.Response, err error if partOffset.currentPreferredReplica == -1 { reloadOffsets.list.setLoadOffset(topic, partition, s.cl.cfg.resetOffset, -1, req.maxSeq) } else if partOffset.offset < fetchPart.LogStartOffset { - reloadOffsets.list.setLoadOffset(topic, partition, s.cl.cfg.resetOffset, s.b.id, req.maxSeq) + reloadOffsets.list.setLoadOffset(topic, partition, s.cl.cfg.resetOffset, s.b.meta.NodeID, req.maxSeq) } else { // partOffset.offset > fetchPart.HighWatermark reloadOffsets.epoch.setLoadOffset(topic, partition, Offset{ request: partOffset.offset,