Skip to content

Commit

Permalink
hooks: namespace all hooks with Broker
Browse files Browse the repository at this point in the history
As the final breaking change in this series, this namespaces all
existing hook methods with "Broker". This gives the client freedom to
add other E2E hooks down the line, or other write / read hooks, etc.
  • Loading branch information
twmb committed Jun 7, 2021
1 parent 8498383 commit 6a048db
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 30 deletions.
14 changes: 7 additions & 7 deletions pkg/kgo/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ func (b *broker) handleReqs() {
func (cxn *brokerCxn) hookWriteE2E(key int16, bytesWritten int, writeWait, timeToWrite time.Duration, writeErr error) {
cxn.cl.cfg.hooks.each(func(h Hook) {
if h, ok := h.(HookBrokerE2E); ok {
h.OnE2E(cxn.b.meta, key, E2EInfo{
h.OnBrokerE2E(cxn.b.meta, key, BrokerE2E{
BytesWritten: bytesWritten,
WriteWait: writeWait,
TimeToWrite: timeToWrite,
Expand Down Expand Up @@ -519,7 +519,7 @@ func (b *broker) connect(ctx context.Context) (net.Conn, error) {
since := time.Since(start)
b.cl.cfg.hooks.each(func(h Hook) {
if h, ok := h.(HookBrokerConnect); ok {
h.OnConnect(b.meta, since, conn, err)
h.OnBrokerConnect(b.meta, since, conn, err)
}
})
if err != nil {
Expand Down Expand Up @@ -867,7 +867,7 @@ func (cxn *brokerCxn) writeRequest(ctx context.Context, enqueuedForWritingAt tim

cxn.cl.cfg.hooks.each(func(h Hook) {
if h, ok := h.(HookBrokerWrite); ok {
h.OnWrite(cxn.b.meta, req.Key(), bytesWritten, writeWait, timeToWrite, writeErr)
h.OnBrokerWrite(cxn.b.meta, req.Key(), bytesWritten, writeWait, timeToWrite, writeErr)
}
})
if logger := cxn.cl.cfg.logger; logger.Level() >= LogLevelDebug {
Expand Down Expand Up @@ -1040,9 +1040,9 @@ func (cxn *brokerCxn) readResponse(
cxn.cl.cfg.hooks.each(func(h Hook) {
switch h := h.(type) {
case HookBrokerRead:
h.OnRead(cxn.b.meta, key, bytesRead, readWait, timeToRead, readErr)
h.OnBrokerRead(cxn.b.meta, key, bytesRead, readWait, timeToRead, readErr)
case HookBrokerE2E:
h.OnE2E(cxn.b.meta, key, E2EInfo{
h.OnBrokerE2E(cxn.b.meta, key, BrokerE2E{
BytesWritten: bytesWritten,
BytesRead: bytesRead,
WriteWait: writeWait,
Expand Down Expand Up @@ -1083,7 +1083,7 @@ func (cxn *brokerCxn) readResponse(
func (cxn *brokerCxn) closeConn() {
cxn.cl.cfg.hooks.each(func(h Hook) {
if h, ok := h.(HookBrokerDisconnect); ok {
h.OnDisconnect(cxn.b.meta, cxn.conn)
h.OnBrokerDisconnect(cxn.b.meta, cxn.conn)
}
})
cxn.conn.Close()
Expand Down Expand Up @@ -1229,7 +1229,7 @@ func (cxn *brokerCxn) discard() {

cxn.cl.cfg.hooks.each(func(h Hook) {
if h, ok := h.(HookBrokerRead); ok {
h.OnRead(cxn.b.meta, 0, nread, 0, timeToRead, err)
h.OnBrokerRead(cxn.b.meta, 0, nread, 0, timeToRead, err)
}
})
if err != nil {
Expand Down
46 changes: 23 additions & 23 deletions pkg/kgo/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,53 +29,53 @@ func (hs hooks) each(fn func(Hook)) {

// HookBrokerConnect is called after a connection to a broker is opened.
type HookBrokerConnect interface {
// OnConnect is passed the broker metadata, how long it took to dial,
// and either the dial's resulting net.Conn or error.
OnConnect(meta BrokerMetadata, dialDur time.Duration, conn net.Conn, err error)
// OnBrokerConnect is passed the broker metadata, how long it took to
// dial, and either the dial's resulting net.Conn or error.
OnBrokerConnect(meta BrokerMetadata, dialDur time.Duration, conn net.Conn, err error)
}

// HookBrokerDisconnect is called when a connection to a broker is closed.
type HookBrokerDisconnect interface {
// OnDisconnect is passed the broker metadata and the connection that
// is closing.
OnDisconnect(meta BrokerMetadata, conn net.Conn)
// OnBrokerDisconnect is passed the broker metadata and the connection
// that is closing.
OnBrokerDisconnect(meta BrokerMetadata, conn net.Conn)
}

// HookBrokerWrite is called after a write to a broker.
//
// Kerberos SASL does not cause write hooks, since it directly writes to the
// connection.
type HookBrokerWrite interface {
// OnWrite is passed the broker metadata, the key for the request that
// was written, the number of bytes that were written (may not be the
// whole request if there was an error), how long the request waited
// before being written (including throttling waiting), how long it
// took to write the request, and any error.
// OnBrokerWrite is passed the broker metadata, the key for the request
// that was written, the number of bytes that were written (may not be
// the whole request if there was an error), how long the request
// waited before being written (including throttling waiting), how long
// it took to write the request, and any error.
//
// The bytes written does not count any tls overhead.
OnWrite(meta BrokerMetadata, key int16, bytesWritten int, writeWait, timeToWrite time.Duration, err error)
OnBrokerWrite(meta BrokerMetadata, key int16, bytesWritten int, writeWait, timeToWrite time.Duration, err error)
}

// HookBrokerRead is called after a read from a broker.
//
// Kerberos SASL does not cause read hooks, since it directly reads from the
// connection.
type HookBrokerRead interface {
// OnRead is passed the broker metadata, the key for the response that
// was read, the number of bytes read (may not be the whole read if
// there was an error), how long the client waited before reading the
// response, how long it took to read the response, and any error.
// OnBrokerRead is passed the broker metadata, the key for the response
// that was read, the number of bytes read (may not be the whole read
// if there was an error), how long the client waited before reading
// the response, how long it took to read the response, and any error.
//
// The bytes read does not count any tls overhead.
OnRead(meta BrokerMetadata, key int16, bytesRead int, readWait, timeToRead time.Duration, err error)
OnBrokerRead(meta BrokerMetadata, key int16, bytesRead int, readWait, timeToRead time.Duration, err error)
}

// E2EInfo tracks complete information for a write of a request followed by a
// BrokerE2E tracks complete information for a write of a request followed by a
// read of that requests's response.
//
// Note that if this is for a produce request with no acks, there will be no
// read wait / time to read.
type E2EInfo struct {
type BrokerE2E struct {
// BytesWritten is the number of bytes written for this request.
//
// This may not be the whole request if there was an error while writing.
Expand Down Expand Up @@ -117,13 +117,13 @@ type E2EInfo struct {
//
// Kerberos SASL does not cause this hook, since it directly reads from the
// connection.
func (e *E2EInfo) DurationE2E() time.Duration {
func (e *BrokerE2E) DurationE2E() time.Duration {
return e.TimeToWrite + e.ReadWait + e.TimeToRead
}

// Err returns the first of either the write err or the read err. If this
// return is non-nil, the request/response had an error.
func (e *E2EInfo) Err() error {
func (e *BrokerE2E) Err() error {
if e.WriteErr != nil {
return e.WriteErr
}
Expand All @@ -137,10 +137,10 @@ func (e *E2EInfo) Err() error {
// info for a write and a read, which allows for easier e2e metrics. This hook
// can replace both the read and write hook.
type HookBrokerE2E interface {
// OnE2E is passed the broker metadata, the key for the
// OnBrokerE2E is passed the broker metadata, the key for the
// request/response that was written/read, and the e2e info for the
// request and response.
OnE2E(meta BrokerMetadata, key int16, e2e E2EInfo)
OnBrokerE2E(meta BrokerMetadata, key int16, e2e BrokerE2E)
}

// HookBrokerThrottle is called after a response to a request is read
Expand Down

0 comments on commit 6a048db

Please sign in to comment.