From 6a048db1ee2892f0f6e280d5853514866be6b134 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Mon, 7 Jun 2021 02:07:28 -0600 Subject: [PATCH] hooks: namespace all hooks with Broker As the final breaking change in this series, this namespaces all existing hook methods with "Broker". This gives the client freedom to add other E2E hooks down the line, or other write / read hooks, etc. --- pkg/kgo/broker.go | 14 +++++++------- pkg/kgo/hooks.go | 46 +++++++++++++++++++++++----------------------- 2 files changed, 30 insertions(+), 30 deletions(-) diff --git a/pkg/kgo/broker.go b/pkg/kgo/broker.go index 3a4c4186..14a02ef2 100644 --- a/pkg/kgo/broker.go +++ b/pkg/kgo/broker.go @@ -374,7 +374,7 @@ func (b *broker) handleReqs() { func (cxn *brokerCxn) hookWriteE2E(key int16, bytesWritten int, writeWait, timeToWrite time.Duration, writeErr error) { cxn.cl.cfg.hooks.each(func(h Hook) { if h, ok := h.(HookBrokerE2E); ok { - h.OnE2E(cxn.b.meta, key, E2EInfo{ + h.OnBrokerE2E(cxn.b.meta, key, BrokerE2E{ BytesWritten: bytesWritten, WriteWait: writeWait, TimeToWrite: timeToWrite, @@ -519,7 +519,7 @@ func (b *broker) connect(ctx context.Context) (net.Conn, error) { since := time.Since(start) b.cl.cfg.hooks.each(func(h Hook) { if h, ok := h.(HookBrokerConnect); ok { - h.OnConnect(b.meta, since, conn, err) + h.OnBrokerConnect(b.meta, since, conn, err) } }) if err != nil { @@ -867,7 +867,7 @@ func (cxn *brokerCxn) writeRequest(ctx context.Context, enqueuedForWritingAt tim cxn.cl.cfg.hooks.each(func(h Hook) { if h, ok := h.(HookBrokerWrite); ok { - h.OnWrite(cxn.b.meta, req.Key(), bytesWritten, writeWait, timeToWrite, writeErr) + h.OnBrokerWrite(cxn.b.meta, req.Key(), bytesWritten, writeWait, timeToWrite, writeErr) } }) if logger := cxn.cl.cfg.logger; logger.Level() >= LogLevelDebug { @@ -1040,9 +1040,9 @@ func (cxn *brokerCxn) readResponse( cxn.cl.cfg.hooks.each(func(h Hook) { switch h := h.(type) { case HookBrokerRead: - h.OnRead(cxn.b.meta, key, bytesRead, readWait, timeToRead, readErr) + h.OnBrokerRead(cxn.b.meta, key, bytesRead, readWait, timeToRead, readErr) case HookBrokerE2E: - h.OnE2E(cxn.b.meta, key, E2EInfo{ + h.OnBrokerE2E(cxn.b.meta, key, BrokerE2E{ BytesWritten: bytesWritten, BytesRead: bytesRead, WriteWait: writeWait, @@ -1083,7 +1083,7 @@ func (cxn *brokerCxn) readResponse( func (cxn *brokerCxn) closeConn() { cxn.cl.cfg.hooks.each(func(h Hook) { if h, ok := h.(HookBrokerDisconnect); ok { - h.OnDisconnect(cxn.b.meta, cxn.conn) + h.OnBrokerDisconnect(cxn.b.meta, cxn.conn) } }) cxn.conn.Close() @@ -1229,7 +1229,7 @@ func (cxn *brokerCxn) discard() { cxn.cl.cfg.hooks.each(func(h Hook) { if h, ok := h.(HookBrokerRead); ok { - h.OnRead(cxn.b.meta, 0, nread, 0, timeToRead, err) + h.OnBrokerRead(cxn.b.meta, 0, nread, 0, timeToRead, err) } }) if err != nil { diff --git a/pkg/kgo/hooks.go b/pkg/kgo/hooks.go index bb86f012..57d5afed 100644 --- a/pkg/kgo/hooks.go +++ b/pkg/kgo/hooks.go @@ -29,16 +29,16 @@ func (hs hooks) each(fn func(Hook)) { // HookBrokerConnect is called after a connection to a broker is opened. type HookBrokerConnect interface { - // OnConnect is passed the broker metadata, how long it took to dial, - // and either the dial's resulting net.Conn or error. - OnConnect(meta BrokerMetadata, dialDur time.Duration, conn net.Conn, err error) + // OnBrokerConnect is passed the broker metadata, how long it took to + // dial, and either the dial's resulting net.Conn or error. + OnBrokerConnect(meta BrokerMetadata, dialDur time.Duration, conn net.Conn, err error) } // HookBrokerDisconnect is called when a connection to a broker is closed. type HookBrokerDisconnect interface { - // OnDisconnect is passed the broker metadata and the connection that - // is closing. - OnDisconnect(meta BrokerMetadata, conn net.Conn) + // OnBrokerDisconnect is passed the broker metadata and the connection + // that is closing. + OnBrokerDisconnect(meta BrokerMetadata, conn net.Conn) } // HookBrokerWrite is called after a write to a broker. @@ -46,14 +46,14 @@ type HookBrokerDisconnect interface { // Kerberos SASL does not cause write hooks, since it directly writes to the // connection. type HookBrokerWrite interface { - // OnWrite is passed the broker metadata, the key for the request that - // was written, the number of bytes that were written (may not be the - // whole request if there was an error), how long the request waited - // before being written (including throttling waiting), how long it - // took to write the request, and any error. + // OnBrokerWrite is passed the broker metadata, the key for the request + // that was written, the number of bytes that were written (may not be + // the whole request if there was an error), how long the request + // waited before being written (including throttling waiting), how long + // it took to write the request, and any error. // // The bytes written does not count any tls overhead. - OnWrite(meta BrokerMetadata, key int16, bytesWritten int, writeWait, timeToWrite time.Duration, err error) + OnBrokerWrite(meta BrokerMetadata, key int16, bytesWritten int, writeWait, timeToWrite time.Duration, err error) } // HookBrokerRead is called after a read from a broker. @@ -61,21 +61,21 @@ type HookBrokerWrite interface { // Kerberos SASL does not cause read hooks, since it directly reads from the // connection. type HookBrokerRead interface { - // OnRead is passed the broker metadata, the key for the response that - // was read, the number of bytes read (may not be the whole read if - // there was an error), how long the client waited before reading the - // response, how long it took to read the response, and any error. + // OnBrokerRead is passed the broker metadata, the key for the response + // that was read, the number of bytes read (may not be the whole read + // if there was an error), how long the client waited before reading + // the response, how long it took to read the response, and any error. // // The bytes read does not count any tls overhead. - OnRead(meta BrokerMetadata, key int16, bytesRead int, readWait, timeToRead time.Duration, err error) + OnBrokerRead(meta BrokerMetadata, key int16, bytesRead int, readWait, timeToRead time.Duration, err error) } -// E2EInfo tracks complete information for a write of a request followed by a +// BrokerE2E tracks complete information for a write of a request followed by a // read of that requests's response. // // Note that if this is for a produce request with no acks, there will be no // read wait / time to read. -type E2EInfo struct { +type BrokerE2E struct { // BytesWritten is the number of bytes written for this request. // // This may not be the whole request if there was an error while writing. @@ -117,13 +117,13 @@ type E2EInfo struct { // // Kerberos SASL does not cause this hook, since it directly reads from the // connection. -func (e *E2EInfo) DurationE2E() time.Duration { +func (e *BrokerE2E) DurationE2E() time.Duration { return e.TimeToWrite + e.ReadWait + e.TimeToRead } // Err returns the first of either the write err or the read err. If this // return is non-nil, the request/response had an error. -func (e *E2EInfo) Err() error { +func (e *BrokerE2E) Err() error { if e.WriteErr != nil { return e.WriteErr } @@ -137,10 +137,10 @@ func (e *E2EInfo) Err() error { // info for a write and a read, which allows for easier e2e metrics. This hook // can replace both the read and write hook. type HookBrokerE2E interface { - // OnE2E is passed the broker metadata, the key for the + // OnBrokerE2E is passed the broker metadata, the key for the // request/response that was written/read, and the e2e info for the // request and response. - OnE2E(meta BrokerMetadata, key int16, e2e E2EInfo) + OnBrokerE2E(meta BrokerMetadata, key int16, e2e BrokerE2E) } // HookBrokerThrottle is called after a response to a request is read