Skip to content

Commit

Permalink
hooks: add HookBrokerE2E
Browse files Browse the repository at this point in the history
Previously, it was very difficult to track e2e information. This new
hook makes it possible and easy.

The only downside is that now we are passing around a lot more
arguments, but ultimately this is non-impactful in comparison to a
network write.

This hook can completely replace the write and write hooks, but it's
easy enough to keep those around, so might as well.
  • Loading branch information
twmb committed May 27, 2021
1 parent 7ecea25 commit bad47ba
Show file tree
Hide file tree
Showing 2 changed files with 183 additions and 48 deletions.
152 changes: 108 additions & 44 deletions pkg/kgo/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,9 @@ type promisedReq struct {
}

type promisedResp struct {
ctx context.Context
corrID int32

readTimeout time.Duration
ctx context.Context

corrID int32
// With flexible headers, we skip tags at the end of the response
// header for now because they're currently unused. However, the
// ApiVersions response uses v0 response header (no tags) even if the
Expand All @@ -53,10 +51,15 @@ type promisedResp struct {
// generator/src/main/java/org/apache/kafka/message/ApiMessageTypeGenerator.java
flexibleHeader bool

resp kmsg.Response
promise func(kmsg.Response, error)
resp kmsg.Response
promise func(kmsg.Response, error)
readTimeout time.Duration

enqueue time.Time // used to calculate readWait
// The following block is used for the read / e2e hooks.
bytesWritten int
writeWait time.Duration
timeToWrite time.Duration
readEnqueue time.Time
}

var unknownMetadata = BrokerMetadata{
Expand Down Expand Up @@ -332,16 +335,18 @@ func (b *broker) handleReqs() {
noResp = &kmsg.ProduceResponse{Version: req.GetVersion()}
}

corrID, err := cxn.writeRequest(pr.ctx, pr.enqueue, req)
corrID, bytesWritten, writeErr, writeWait, timeToWrite, readEnqueue := cxn.writeRequest(pr.ctx, pr.enqueue, req)

if err != nil {
if writeErr != nil {
pr.promise(nil, err)
cxn.die()
cxn.hookWriteE2E(req.Key(), bytesWritten, writeWait, timeToWrite, writeErr)
continue
}

if isNoResp {
pr.promise(noResp, nil)
cxn.hookWriteE2E(req.Key(), bytesWritten, writeWait, timeToWrite, writeErr)
continue
}

Expand All @@ -350,15 +355,31 @@ func (b *broker) handleReqs() {
cxn.waitResp(promisedResp{
pr.ctx,
corrID,
rt,
req.IsFlexible() && req.Key() != 18, // response header not flexible if ApiVersions; see promisedResp doc
req.ResponseKind(),
pr.promise,
time.Now(),
rt,
bytesWritten,
writeWait,
timeToWrite,
readEnqueue,
})
}
}

func (cxn *brokerCxn) hookWriteE2E(key int16, bytesWritten int, writeWait, timeToWrite time.Duration, writeErr error) {
cxn.cl.cfg.hooks.each(func(h Hook) {
if h, ok := h.(HookBrokerE2E); ok {
h.OnE2E(cxn.b.meta, key, E2EInfo{
BytesWritten: bytesWritten,
WriteWait: writeWait,
TimeToWrite: timeToWrite,
WriteErr: writeErr,
})
}
})
}

// bufPool is used to reuse issued-request buffers across writes to brokers.
type bufPool struct{ p *sync.Pool }

Expand Down Expand Up @@ -589,13 +610,15 @@ start:
ClientSoftwareVersion: cxn.cl.cfg.softwareVersion,
}
cxn.cl.cfg.logger.Log(LogLevelDebug, "issuing api versions request", "broker", cxn.b.meta.NodeID, "version", maxVersion)
corrID, err := cxn.writeRequest(nil, time.Now(), req)
if err != nil {
return err
corrID, bytesWritten, writeErr, writeWait, timeToWrite, readEnqueue := cxn.writeRequest(nil, time.Now(), req)
if writeErr != nil {
cxn.hookWriteE2E(req.Key(), bytesWritten, writeWait, timeToWrite, writeErr)
return writeErr
}

rt, _ := cxn.cl.connTimeoutFn(req)
rawResp, err := cxn.readResponse(nil, rt, time.Now(), req.Key(), req.GetVersion(), corrID, false) // api versions does *not* use flexible response headers; see comment in promisedResp
// api versions does *not* use flexible response headers; see comment in promisedResp
rawResp, err := cxn.readResponse(nil, req.Key(), req.GetVersion(), corrID, false, rt, bytesWritten, writeWait, timeToWrite, readEnqueue)
if err != nil {
return err
}
Expand Down Expand Up @@ -656,13 +679,14 @@ start:
req.Mechanism = mechanism.Name()
req.Version = cxn.versions[req.Key()]
cxn.cl.cfg.logger.Log(LogLevelDebug, "issuing SASLHandshakeRequest", "broker", cxn.b.meta.NodeID)
corrID, err := cxn.writeRequest(nil, time.Now(), req)
if err != nil {
return err
corrID, bytesWritten, writeErr, writeWait, timeToWrite, readEnqueue := cxn.writeRequest(nil, time.Now(), req)
if writeErr != nil {
cxn.hookWriteE2E(req.Key(), bytesWritten, writeWait, timeToWrite, writeErr)
return writeErr
}

rt, _ := cxn.cl.connTimeoutFn(req)
rawResp, err := cxn.readResponse(nil, rt, time.Now(), req.Key(), req.GetVersion(), corrID, req.IsFlexible())
rawResp, err := cxn.readResponse(nil, req.Key(), req.GetVersion(), corrID, req.IsFlexible(), rt, bytesWritten, writeWait, timeToWrite, readEnqueue)
if err != nil {
return err
}
Expand Down Expand Up @@ -724,7 +748,7 @@ func (cxn *brokerCxn) doSasl(authenticate bool) error {
buf = append(buf, clientWrite...)

cxn.cl.cfg.logger.Log(LogLevelDebug, "issuing raw sasl authenticate", "broker", cxn.b.meta.NodeID, "step", step)
_, err, _, _ = cxn.writeConn(context.Background(), buf, wt, time.Now())
_, err, _, _, _ = cxn.writeConn(context.Background(), buf, wt, time.Now())

cxn.cl.bufPool.put(buf)

Expand All @@ -744,12 +768,19 @@ func (cxn *brokerCxn) doSasl(authenticate bool) error {
req.Version = cxn.versions[req.Key()]
cxn.cl.cfg.logger.Log(LogLevelDebug, "issuing SASLAuthenticate", "broker", cxn.b.meta.NodeID, "version", req.Version, "step", step)

corrID, err := cxn.writeRequest(nil, time.Now(), req)
if err != nil {
return err
corrID, bytesWritten, writeErr, writeWait, timeToWrite, readEnqueue := cxn.writeRequest(nil, time.Now(), req)

// As mentioned above, we could have one final write
// without reading a response back (kerberos). If this
// is the case, we need to e2e.
if writeErr != nil || done {
cxn.hookWriteE2E(req.Key(), bytesWritten, writeWait, timeToWrite, writeErr)
if writeErr != nil {
return writeErr
}
}
if !done {
rawResp, err := cxn.readResponse(nil, rt, time.Now(), req.Key(), req.GetVersion(), corrID, req.IsFlexible())
rawResp, err := cxn.readResponse(nil, req.Key(), req.GetVersion(), corrID, req.IsFlexible(), rt, bytesWritten, writeWait, timeToWrite, readEnqueue)
if err != nil {
return err
}
Expand Down Expand Up @@ -795,7 +826,7 @@ func (cxn *brokerCxn) doSasl(authenticate bool) error {

// writeRequest writes a message request to the broker connection, bumping the
// connection's correlation ID as appropriate for the next write.
func (cxn *brokerCxn) writeRequest(ctx context.Context, enqueuedForWritingAt time.Time, req kmsg.Request) (int32, error) {
func (cxn *brokerCxn) writeRequest(ctx context.Context, enqueuedForWritingAt time.Time, req kmsg.Request) (corrID int32, bytesWritten int, writeErr error, writeWait, timeToWrite time.Duration, readEnqueue time.Time) {
// A nil ctx means we cannot be throttled.
if ctx != nil {
throttleUntil := time.Unix(0, atomic.LoadInt64(&cxn.throttleUntil))
Expand All @@ -805,13 +836,16 @@ func (cxn *brokerCxn) writeRequest(ctx context.Context, enqueuedForWritingAt tim
case <-after.C:
case <-ctx.Done():
after.Stop()
return 0, ctx.Err()
writeErr = ctx.Err()
case <-cxn.cl.ctx.Done():
after.Stop()
return 0, errClientClosing
writeErr = errClientClosing
case <-cxn.deadCh:
writeErr = errChosenBrokerDead
}
if writeErr != nil {
after.Stop()
return 0, errChosenBrokerDead
writeWait = time.Since(enqueuedForWritingAt)
return
}
}
}
Expand All @@ -825,7 +859,7 @@ func (cxn *brokerCxn) writeRequest(ctx context.Context, enqueuedForWritingAt tim
)

_, wt := cxn.cl.connTimeoutFn(req)
bytesWritten, writeErr, writeWait, timeToWrite := cxn.writeConn(ctx, buf, wt, enqueuedForWritingAt)
bytesWritten, writeErr, writeWait, timeToWrite, readEnqueue = cxn.writeConn(ctx, buf, wt, enqueuedForWritingAt)

cxn.cl.cfg.hooks.each(func(h Hook) {
if h, ok := h.(HookBrokerWrite); ok {
Expand All @@ -837,14 +871,14 @@ func (cxn *brokerCxn) writeRequest(ctx context.Context, enqueuedForWritingAt tim
}

if writeErr != nil {
return 0, writeErr
return
}
id := cxn.corrID
corrID = cxn.corrID
cxn.corrID++
return id, nil
return
}

func (cxn *brokerCxn) writeConn(ctx context.Context, buf []byte, timeout time.Duration, enqueuedForWritingAt time.Time) (bytesWritten int, writeErr error, writeWait, timeToWrite time.Duration) {
func (cxn *brokerCxn) writeConn(ctx context.Context, buf []byte, timeout time.Duration, enqueuedForWritingAt time.Time) (bytesWritten int, writeErr error, writeWait, timeToWrite time.Duration, readEnqueue time.Time) {
atomic.SwapUint32(&cxn.writing, 1)
defer func() {
atomic.StoreInt64(&cxn.lastWrite, time.Now().UnixNano())
Expand All @@ -863,8 +897,11 @@ func (cxn *brokerCxn) writeConn(ctx context.Context, buf []byte, timeout time.Du
defer close(writeDone)
writeStart := time.Now()
bytesWritten, writeErr = cxn.conn.Write(buf)
timeToWrite = time.Since(writeStart)
// As soon as we are done writing, we track that we have now
// enqueued this request for reading.
readEnqueue = time.Now()
writeWait = writeStart.Sub(enqueuedForWritingAt)
timeToWrite = readEnqueue.Sub(writeStart)
}()
select {
case <-writeDone:
Expand Down Expand Up @@ -984,20 +1021,45 @@ func (cxn *brokerCxn) parseReadSize(sizeBuf []byte) (int32, error) {

// readResponse reads a response from conn, ensures the correlation ID is
// correct, and returns a newly allocated slice on success.
func (cxn *brokerCxn) readResponse(ctx context.Context, timeout time.Duration, enqueuedForReadingAt time.Time, key, version int16, corrID int32, flexibleHeader bool) ([]byte, error) {
nread, buf, err, readWait, timeToRead := cxn.readConn(ctx, timeout, enqueuedForReadingAt)
//
// This takes a bunch of extra arguments in support of HookBrokerE2E, overall
// this function takes 11 bytes in arguments.
func (cxn *brokerCxn) readResponse(
ctx context.Context,
key int16,
version int16,
corrID int32,
flexibleHeader bool,
timeout time.Duration,
bytesWritten int,
writeWait time.Duration,
timeToWrite time.Duration,
readEnqueue time.Time,
) ([]byte, error) {
bytesRead, buf, readErr, readWait, timeToRead := cxn.readConn(ctx, timeout, readEnqueue)

cxn.cl.cfg.hooks.each(func(h Hook) {
if h, ok := h.(HookBrokerRead); ok {
h.OnRead(cxn.b.meta, key, nread, readWait, timeToRead, err)
switch h := h.(type) {
case HookBrokerRead:
h.OnRead(cxn.b.meta, key, bytesRead, readWait, timeToRead, readErr)
case HookBrokerE2E:
h.OnE2E(cxn.b.meta, key, E2EInfo{
BytesWritten: bytesWritten,
BytesRead: bytesRead,
WriteWait: writeWait,
TimeToWrite: timeToWrite,
ReadWait: readWait,
TimeToRead: timeToRead,
ReadErr: readErr,
})
}
})
if logger := cxn.cl.cfg.logger; logger.Level() >= LogLevelDebug {
logger.Log(LogLevelDebug, fmt.Sprintf("read %s v%d", kmsg.NameForKey(key), version), "broker", cxn.b.meta.NodeID, "bytes_read", nread, "read_wait", readWait, "time_to_read", timeToRead, "err", err)
logger.Log(LogLevelDebug, fmt.Sprintf("read %s v%d", kmsg.NameForKey(key), version), "broker", cxn.b.meta.NodeID, "bytes_read", bytesRead, "read_wait", readWait, "time_to_read", timeToRead, "err", readErr)
}

if err != nil {
return nil, err
if readErr != nil {
return nil, readErr
}
if len(buf) < 4 {
return nil, kbin.ErrNotEnoughData
Expand Down Expand Up @@ -1044,6 +1106,7 @@ func (cxn *brokerCxn) die() {
go func() {
for pr := range cxn.resps {
pr.promise(nil, errChosenBrokerDead)
cxn.hookWriteE2E(pr.resp.Key(), pr.bytesWritten, pr.writeWait, pr.timeToWrite, errChosenBrokerDead)
}
}()

Expand All @@ -1068,6 +1131,7 @@ func (cxn *brokerCxn) waitResp(pr promisedResp) {

if dead {
pr.promise(nil, errChosenBrokerDead)
cxn.hookWriteE2E(pr.resp.Key(), pr.bytesWritten, pr.writeWait, pr.timeToWrite, errChosenBrokerDead)
}
}

Expand Down Expand Up @@ -1185,7 +1249,7 @@ func (cxn *brokerCxn) handleResps() {

var successes uint64
for pr := range cxn.resps {
raw, err := cxn.readResponse(pr.ctx, pr.readTimeout, pr.enqueue, pr.resp.Key(), pr.resp.GetVersion(), pr.corrID, pr.flexibleHeader)
rawResp, err := cxn.readResponse(pr.ctx, pr.resp.Key(), pr.resp.GetVersion(), pr.corrID, pr.flexibleHeader, pr.readTimeout, pr.bytesWritten, pr.writeWait, pr.timeToWrite, pr.readEnqueue)
if err != nil {
if successes > 0 || len(cxn.b.cl.cfg.sasls) > 0 {
cxn.b.cl.cfg.logger.Log(LogLevelDebug, "read from broker errored, killing connection", "addr", cxn.b.addr, "id", cxn.b.meta.NodeID, "successful_reads", successes, "err", err)
Expand All @@ -1196,7 +1260,7 @@ func (cxn *brokerCxn) handleResps() {
return
}
successes++
readErr := pr.resp.ReadFrom(raw)
readErr := pr.resp.ReadFrom(rawResp)

// If we had no error, we read the response successfully.
//
Expand Down
Loading

0 comments on commit bad47ba

Please sign in to comment.