diff --git a/pkg/kgo/broker.go b/pkg/kgo/broker.go index 61f028f0..1adfba18 100644 --- a/pkg/kgo/broker.go +++ b/pkg/kgo/broker.go @@ -28,11 +28,9 @@ type promisedReq struct { } type promisedResp struct { - ctx context.Context - corrID int32 - - readTimeout time.Duration + ctx context.Context + corrID int32 // With flexible headers, we skip tags at the end of the response // header for now because they're currently unused. However, the // ApiVersions response uses v0 response header (no tags) even if the @@ -53,10 +51,15 @@ type promisedResp struct { // generator/src/main/java/org/apache/kafka/message/ApiMessageTypeGenerator.java flexibleHeader bool - resp kmsg.Response - promise func(kmsg.Response, error) + resp kmsg.Response + promise func(kmsg.Response, error) + readTimeout time.Duration - enqueue time.Time // used to calculate readWait + // The following block is used for the read / e2e hooks. + bytesWritten int + writeWait time.Duration + timeToWrite time.Duration + readEnqueue time.Time } var unknownMetadata = BrokerMetadata{ @@ -332,16 +335,18 @@ func (b *broker) handleReqs() { noResp = &kmsg.ProduceResponse{Version: req.GetVersion()} } - corrID, err := cxn.writeRequest(pr.ctx, pr.enqueue, req) + corrID, bytesWritten, writeErr, writeWait, timeToWrite, readEnqueue := cxn.writeRequest(pr.ctx, pr.enqueue, req) - if err != nil { + if writeErr != nil { pr.promise(nil, err) cxn.die() + cxn.hookWriteE2E(req.Key(), bytesWritten, writeWait, timeToWrite, writeErr) continue } if isNoResp { pr.promise(noResp, nil) + cxn.hookWriteE2E(req.Key(), bytesWritten, writeWait, timeToWrite, writeErr) continue } @@ -350,15 +355,31 @@ func (b *broker) handleReqs() { cxn.waitResp(promisedResp{ pr.ctx, corrID, - rt, req.IsFlexible() && req.Key() != 18, // response header not flexible if ApiVersions; see promisedResp doc req.ResponseKind(), pr.promise, - time.Now(), + rt, + bytesWritten, + writeWait, + timeToWrite, + readEnqueue, }) } } +func (cxn *brokerCxn) hookWriteE2E(key int16, bytesWritten int, writeWait, timeToWrite time.Duration, writeErr error) { + cxn.cl.cfg.hooks.each(func(h Hook) { + if h, ok := h.(HookBrokerE2E); ok { + h.OnE2E(cxn.b.meta, key, E2EInfo{ + BytesWritten: bytesWritten, + WriteWait: writeWait, + TimeToWrite: timeToWrite, + WriteErr: writeErr, + }) + } + }) +} + // bufPool is used to reuse issued-request buffers across writes to brokers. type bufPool struct{ p *sync.Pool } @@ -589,13 +610,15 @@ start: ClientSoftwareVersion: cxn.cl.cfg.softwareVersion, } cxn.cl.cfg.logger.Log(LogLevelDebug, "issuing api versions request", "broker", cxn.b.meta.NodeID, "version", maxVersion) - corrID, err := cxn.writeRequest(nil, time.Now(), req) - if err != nil { - return err + corrID, bytesWritten, writeErr, writeWait, timeToWrite, readEnqueue := cxn.writeRequest(nil, time.Now(), req) + if writeErr != nil { + cxn.hookWriteE2E(req.Key(), bytesWritten, writeWait, timeToWrite, writeErr) + return writeErr } rt, _ := cxn.cl.connTimeoutFn(req) - rawResp, err := cxn.readResponse(nil, rt, time.Now(), req.Key(), req.GetVersion(), corrID, false) // api versions does *not* use flexible response headers; see comment in promisedResp + // api versions does *not* use flexible response headers; see comment in promisedResp + rawResp, err := cxn.readResponse(nil, req.Key(), req.GetVersion(), corrID, false, rt, bytesWritten, writeWait, timeToWrite, readEnqueue) if err != nil { return err } @@ -656,13 +679,14 @@ start: req.Mechanism = mechanism.Name() req.Version = cxn.versions[req.Key()] cxn.cl.cfg.logger.Log(LogLevelDebug, "issuing SASLHandshakeRequest", "broker", cxn.b.meta.NodeID) - corrID, err := cxn.writeRequest(nil, time.Now(), req) - if err != nil { - return err + corrID, bytesWritten, writeErr, writeWait, timeToWrite, readEnqueue := cxn.writeRequest(nil, time.Now(), req) + if writeErr != nil { + cxn.hookWriteE2E(req.Key(), bytesWritten, writeWait, timeToWrite, writeErr) + return writeErr } rt, _ := cxn.cl.connTimeoutFn(req) - rawResp, err := cxn.readResponse(nil, rt, time.Now(), req.Key(), req.GetVersion(), corrID, req.IsFlexible()) + rawResp, err := cxn.readResponse(nil, req.Key(), req.GetVersion(), corrID, req.IsFlexible(), rt, bytesWritten, writeWait, timeToWrite, readEnqueue) if err != nil { return err } @@ -724,7 +748,7 @@ func (cxn *brokerCxn) doSasl(authenticate bool) error { buf = append(buf, clientWrite...) cxn.cl.cfg.logger.Log(LogLevelDebug, "issuing raw sasl authenticate", "broker", cxn.b.meta.NodeID, "step", step) - _, err, _, _ = cxn.writeConn(context.Background(), buf, wt, time.Now()) + _, err, _, _, _ = cxn.writeConn(context.Background(), buf, wt, time.Now()) cxn.cl.bufPool.put(buf) @@ -744,12 +768,19 @@ func (cxn *brokerCxn) doSasl(authenticate bool) error { req.Version = cxn.versions[req.Key()] cxn.cl.cfg.logger.Log(LogLevelDebug, "issuing SASLAuthenticate", "broker", cxn.b.meta.NodeID, "version", req.Version, "step", step) - corrID, err := cxn.writeRequest(nil, time.Now(), req) - if err != nil { - return err + corrID, bytesWritten, writeErr, writeWait, timeToWrite, readEnqueue := cxn.writeRequest(nil, time.Now(), req) + + // As mentioned above, we could have one final write + // without reading a response back (kerberos). If this + // is the case, we need to e2e. + if writeErr != nil || done { + cxn.hookWriteE2E(req.Key(), bytesWritten, writeWait, timeToWrite, writeErr) + if writeErr != nil { + return writeErr + } } if !done { - rawResp, err := cxn.readResponse(nil, rt, time.Now(), req.Key(), req.GetVersion(), corrID, req.IsFlexible()) + rawResp, err := cxn.readResponse(nil, req.Key(), req.GetVersion(), corrID, req.IsFlexible(), rt, bytesWritten, writeWait, timeToWrite, readEnqueue) if err != nil { return err } @@ -795,7 +826,7 @@ func (cxn *brokerCxn) doSasl(authenticate bool) error { // writeRequest writes a message request to the broker connection, bumping the // connection's correlation ID as appropriate for the next write. -func (cxn *brokerCxn) writeRequest(ctx context.Context, enqueuedForWritingAt time.Time, req kmsg.Request) (int32, error) { +func (cxn *brokerCxn) writeRequest(ctx context.Context, enqueuedForWritingAt time.Time, req kmsg.Request) (corrID int32, bytesWritten int, writeErr error, writeWait, timeToWrite time.Duration, readEnqueue time.Time) { // A nil ctx means we cannot be throttled. if ctx != nil { throttleUntil := time.Unix(0, atomic.LoadInt64(&cxn.throttleUntil)) @@ -805,13 +836,16 @@ func (cxn *brokerCxn) writeRequest(ctx context.Context, enqueuedForWritingAt tim case <-after.C: case <-ctx.Done(): after.Stop() - return 0, ctx.Err() + writeErr = ctx.Err() case <-cxn.cl.ctx.Done(): - after.Stop() - return 0, errClientClosing + writeErr = errClientClosing case <-cxn.deadCh: + writeErr = errChosenBrokerDead + } + if writeErr != nil { after.Stop() - return 0, errChosenBrokerDead + writeWait = time.Since(enqueuedForWritingAt) + return } } } @@ -825,7 +859,7 @@ func (cxn *brokerCxn) writeRequest(ctx context.Context, enqueuedForWritingAt tim ) _, wt := cxn.cl.connTimeoutFn(req) - bytesWritten, writeErr, writeWait, timeToWrite := cxn.writeConn(ctx, buf, wt, enqueuedForWritingAt) + bytesWritten, writeErr, writeWait, timeToWrite, readEnqueue = cxn.writeConn(ctx, buf, wt, enqueuedForWritingAt) cxn.cl.cfg.hooks.each(func(h Hook) { if h, ok := h.(HookBrokerWrite); ok { @@ -837,14 +871,14 @@ func (cxn *brokerCxn) writeRequest(ctx context.Context, enqueuedForWritingAt tim } if writeErr != nil { - return 0, writeErr + return } - id := cxn.corrID + corrID = cxn.corrID cxn.corrID++ - return id, nil + return } -func (cxn *brokerCxn) writeConn(ctx context.Context, buf []byte, timeout time.Duration, enqueuedForWritingAt time.Time) (bytesWritten int, writeErr error, writeWait, timeToWrite time.Duration) { +func (cxn *brokerCxn) writeConn(ctx context.Context, buf []byte, timeout time.Duration, enqueuedForWritingAt time.Time) (bytesWritten int, writeErr error, writeWait, timeToWrite time.Duration, readEnqueue time.Time) { atomic.SwapUint32(&cxn.writing, 1) defer func() { atomic.StoreInt64(&cxn.lastWrite, time.Now().UnixNano()) @@ -863,8 +897,11 @@ func (cxn *brokerCxn) writeConn(ctx context.Context, buf []byte, timeout time.Du defer close(writeDone) writeStart := time.Now() bytesWritten, writeErr = cxn.conn.Write(buf) - timeToWrite = time.Since(writeStart) + // As soon as we are done writing, we track that we have now + // enqueued this request for reading. + readEnqueue = time.Now() writeWait = writeStart.Sub(enqueuedForWritingAt) + timeToWrite = readEnqueue.Sub(writeStart) }() select { case <-writeDone: @@ -984,20 +1021,45 @@ func (cxn *brokerCxn) parseReadSize(sizeBuf []byte) (int32, error) { // readResponse reads a response from conn, ensures the correlation ID is // correct, and returns a newly allocated slice on success. -func (cxn *brokerCxn) readResponse(ctx context.Context, timeout time.Duration, enqueuedForReadingAt time.Time, key, version int16, corrID int32, flexibleHeader bool) ([]byte, error) { - nread, buf, err, readWait, timeToRead := cxn.readConn(ctx, timeout, enqueuedForReadingAt) +// +// This takes a bunch of extra arguments in support of HookBrokerE2E, overall +// this function takes 11 bytes in arguments. +func (cxn *brokerCxn) readResponse( + ctx context.Context, + key int16, + version int16, + corrID int32, + flexibleHeader bool, + timeout time.Duration, + bytesWritten int, + writeWait time.Duration, + timeToWrite time.Duration, + readEnqueue time.Time, +) ([]byte, error) { + bytesRead, buf, readErr, readWait, timeToRead := cxn.readConn(ctx, timeout, readEnqueue) cxn.cl.cfg.hooks.each(func(h Hook) { - if h, ok := h.(HookBrokerRead); ok { - h.OnRead(cxn.b.meta, key, nread, readWait, timeToRead, err) + switch h := h.(type) { + case HookBrokerRead: + h.OnRead(cxn.b.meta, key, bytesRead, readWait, timeToRead, readErr) + case HookBrokerE2E: + h.OnE2E(cxn.b.meta, key, E2EInfo{ + BytesWritten: bytesWritten, + BytesRead: bytesRead, + WriteWait: writeWait, + TimeToWrite: timeToWrite, + ReadWait: readWait, + TimeToRead: timeToRead, + ReadErr: readErr, + }) } }) if logger := cxn.cl.cfg.logger; logger.Level() >= LogLevelDebug { - logger.Log(LogLevelDebug, fmt.Sprintf("read %s v%d", kmsg.NameForKey(key), version), "broker", cxn.b.meta.NodeID, "bytes_read", nread, "read_wait", readWait, "time_to_read", timeToRead, "err", err) + logger.Log(LogLevelDebug, fmt.Sprintf("read %s v%d", kmsg.NameForKey(key), version), "broker", cxn.b.meta.NodeID, "bytes_read", bytesRead, "read_wait", readWait, "time_to_read", timeToRead, "err", readErr) } - if err != nil { - return nil, err + if readErr != nil { + return nil, readErr } if len(buf) < 4 { return nil, kbin.ErrNotEnoughData @@ -1044,6 +1106,7 @@ func (cxn *brokerCxn) die() { go func() { for pr := range cxn.resps { pr.promise(nil, errChosenBrokerDead) + cxn.hookWriteE2E(pr.resp.Key(), pr.bytesWritten, pr.writeWait, pr.timeToWrite, errChosenBrokerDead) } }() @@ -1068,6 +1131,7 @@ func (cxn *brokerCxn) waitResp(pr promisedResp) { if dead { pr.promise(nil, errChosenBrokerDead) + cxn.hookWriteE2E(pr.resp.Key(), pr.bytesWritten, pr.writeWait, pr.timeToWrite, errChosenBrokerDead) } } @@ -1185,7 +1249,7 @@ func (cxn *brokerCxn) handleResps() { var successes uint64 for pr := range cxn.resps { - raw, err := cxn.readResponse(pr.ctx, pr.readTimeout, pr.enqueue, pr.resp.Key(), pr.resp.GetVersion(), pr.corrID, pr.flexibleHeader) + rawResp, err := cxn.readResponse(pr.ctx, pr.resp.Key(), pr.resp.GetVersion(), pr.corrID, pr.flexibleHeader, pr.readTimeout, pr.bytesWritten, pr.writeWait, pr.timeToWrite, pr.readEnqueue) if err != nil { if successes > 0 || len(cxn.b.cl.cfg.sasls) > 0 { cxn.b.cl.cfg.logger.Log(LogLevelDebug, "read from broker errored, killing connection", "addr", cxn.b.addr, "id", cxn.b.meta.NodeID, "successful_reads", successes, "err", err) @@ -1196,7 +1260,7 @@ func (cxn *brokerCxn) handleResps() { return } successes++ - readErr := pr.resp.ReadFrom(raw) + readErr := pr.resp.ReadFrom(rawResp) // If we had no error, we read the response successfully. // diff --git a/pkg/kgo/hooks.go b/pkg/kgo/hooks.go index 3baf0ab7..bb86f012 100644 --- a/pkg/kgo/hooks.go +++ b/pkg/kgo/hooks.go @@ -44,8 +44,7 @@ type HookBrokerDisconnect interface { // HookBrokerWrite is called after a write to a broker. // // Kerberos SASL does not cause write hooks, since it directly writes to the -// connection. This may change in the future such that the sasl authenticate -// key is used (even though sasl authenticate requests are not being issued). +// connection. type HookBrokerWrite interface { // OnWrite is passed the broker metadata, the key for the request that // was written, the number of bytes that were written (may not be the @@ -60,8 +59,7 @@ type HookBrokerWrite interface { // HookBrokerRead is called after a read from a broker. // // Kerberos SASL does not cause read hooks, since it directly reads from the -// connection. This may change in the future such that the sasl authenticate -// key is used (even though sasl authenticate requests are not being issued). +// connection. type HookBrokerRead interface { // OnRead is passed the broker metadata, the key for the response that // was read, the number of bytes read (may not be the whole read if @@ -72,6 +70,79 @@ type HookBrokerRead interface { OnRead(meta BrokerMetadata, key int16, bytesRead int, readWait, timeToRead time.Duration, err error) } +// E2EInfo tracks complete information for a write of a request followed by a +// read of that requests's response. +// +// Note that if this is for a produce request with no acks, there will be no +// read wait / time to read. +type E2EInfo struct { + // BytesWritten is the number of bytes written for this request. + // + // This may not be the whole request if there was an error while writing. + BytesWritten int + + // BytesRead is the number of bytes read for this requests's response. + // + // This may not be the whole response if there was an error while + // reading, and this will be zero if there was a write error. + BytesRead int + + // WriteWait is the time spent waiting from when this request was + // generated internally in the client to just before the request is + // written to the connection. This number is not included in the + // DurationE2E method. + WriteWait time.Duration + // TimeToWrite is how long a request took to be written on the wire. + // This specifically tracks only how long conn.Write takes. + TimeToWrite time.Duration + // ReadWait tracks the span of time immediately following conn.Write + // until conn.Read begins. + ReadWait time.Duration + // TimeToRead tracks how long conn.Read takes for this request to be + // entirely read. This includes the time it takes to allocate a buffer + // for the response after the initial four size bytes are read. + TimeToRead time.Duration + + // WriteErr is any error encountered during writing. If a write error is + // encountered, no read will be attempted. + WriteErr error + // ReadErr is any error encountered during reading. + ReadErr error +} + +// DurationE2E returns the e2e time from the start of when a request is written +// to the end of when the response for that request was fully read. If a write +// or read error occurs, this hook is called with all information possible at +// the time (e.g., if a write error occurs, all write info is specified). +// +// Kerberos SASL does not cause this hook, since it directly reads from the +// connection. +func (e *E2EInfo) DurationE2E() time.Duration { + return e.TimeToWrite + e.ReadWait + e.TimeToRead +} + +// Err returns the first of either the write err or the read err. If this +// return is non-nil, the request/response had an error. +func (e *E2EInfo) Err() error { + if e.WriteErr != nil { + return e.WriteErr + } + return e.ReadErr +} + +// HookBrokerE2E is called after a write to a broker that errors, or after a +// read to a broker. +// +// This differs from HookBrokerRead and HookBrokerWrite by tracking all E2E +// info for a write and a read, which allows for easier e2e metrics. This hook +// can replace both the read and write hook. +type HookBrokerE2E interface { + // OnE2E is passed the broker metadata, the key for the + // request/response that was written/read, and the e2e info for the + // request and response. + OnE2E(meta BrokerMetadata, key int16, e2e E2EInfo) +} + // HookBrokerThrottle is called after a response to a request is read // from a broker, and the response identifies throttling in effect. type HookBrokerThrottle interface {