Skip to content

Commit

Permalink
producing: add HookProduceBatchWritten
Browse files Browse the repository at this point in the history
This can be used to calculate compression ratios, as well as throughput
per topic. The throughput _could_ be calculated by the user if they add
something into their own promises, but by using a hook, the user can
more easily calculate throughput, but also calculate average batch size,
compression ratio, bytes per batch, etc, all in one nice location.
  • Loading branch information
twmb committed Jun 13, 2021
1 parent 20e5912 commit 9810427
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 26 deletions.
32 changes: 32 additions & 0 deletions pkg/kgo/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,3 +172,35 @@ type HookGroupManageError interface {
// reached).
OnGroupManageError(error)
}

// ProduceBatchMetrics tracks information about successful produces to
// partitions.
type ProduceBatchMetrics struct {
// NumRecords is the number of records that were produced in this
// batch.
NumRecords int

// UncompressedBytes is the number of bytes the records serialized as
// before compression.
UncompressedBytes int

// CompressedBytes is the number of bytes actually written for this
// batch, after compression. If compression is not used, this will be
// equal to UncompresedBytes.
CompressedBytes int

// CompressionType signifies which algorithm the batch was compressed
// with.
//
// 0 is no compression, 1 is gzip, 2 is snappy, 3 is lz4, and 4 is
// zstd.
CompressionType uint8
}

// HookProduceBatchWritten is called whenever a batch is known to be
// successfully produced.
type HookProduceBatchWritten interface {
// OnProduceBatchWritten is called per successful batch written to a
// topic partition
OnProduceBatchWritten(meta BrokerMetadata, topic string, partition int32, metrics ProduceBatchMetrics)
}
10 changes: 5 additions & 5 deletions pkg/kgo/produce_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func TestRecBatchAppendTo(t *testing.T) {
var checkNum int
check := func() {
exp := kbatch.AppendTo(nil)
gotFull := ourBatch.appendTo(nil, version, 12, 11, true, true, compressor)
gotFull, _ := ourBatch.appendTo(nil, version, 12, 11, true, true, compressor)
lengthPrefix := 4
ourBatchSize := (&kbin.Reader{Src: gotFull}).Int32()
if version >= 9 {
Expand Down Expand Up @@ -318,11 +318,11 @@ func TestMessageSetAppendTo(t *testing.T) {
kset0rawc = kset0c.AppendTo(nil)
kset1rawc = kset1c.AppendTo(nil)

got0raw = ourBatch.appendToAsMessageSet(nil, 1, nil)
got1raw = ourBatch.appendToAsMessageSet(nil, 2, nil)
got0raw, _ = ourBatch.appendToAsMessageSet(nil, 1, nil)
got1raw, _ = ourBatch.appendToAsMessageSet(nil, 2, nil)

got0rawc = ourBatch.appendToAsMessageSet(nil, 1, compressor)
got1rawc = ourBatch.appendToAsMessageSet(nil, 2, compressor)
got0rawc, _ = ourBatch.appendToAsMessageSet(nil, 1, compressor)
got1rawc, _ = ourBatch.appendToAsMessageSet(nil, 2, compressor)
)

for i, pair := range []struct {
Expand Down
97 changes: 76 additions & 21 deletions pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ type seqResp struct {
resp kmsg.Response
err error
done chan struct{}
promise func(kmsg.Response, error)
br *broker
promise func(*broker, kmsg.Response, error)
}

func (cl *Client) newSink(nodeID int32) *sink {
Expand Down Expand Up @@ -333,8 +334,8 @@ func (s *sink) produce(sem <-chan struct{}) bool {
req.backoffSeq = s.backoffSeq // safe to read outside mu since we are in drain loop

produced = true
s.doSequenced(req, func(resp kmsg.Response, err error) {
s.handleReqResp(req, resp, err)
s.doSequenced(req, func(br *broker, resp kmsg.Response, err error) {
s.handleReqResp(br, req, resp, err)
<-sem
})
return moreToDrain
Expand All @@ -344,7 +345,7 @@ func (s *sink) produce(sem <-chan struct{}) bool {
// are handled in order. We use this guarantee while in handleReqResp below.
func (s *sink) doSequenced(
req kmsg.Request,
promise func(kmsg.Response, error),
promise func(*broker, kmsg.Response, error),
) {
wait := &seqResp{
done: make(chan struct{}),
Expand All @@ -361,6 +362,7 @@ func (s *sink) doSequenced(
wait.err = err
close(wait.done)
})
wait.br = br
}

s.seqRespsMu.Lock()
Expand All @@ -376,7 +378,7 @@ func (s *sink) doSequenced(
func (s *sink) handleSeqResps(wait *seqResp) {
more:
<-wait.done
wait.promise(wait.resp, wait.err)
wait.promise(wait.br, wait.resp, wait.err)

s.seqRespsMu.Lock()
s.seqResps = s.seqResps[1:]
Expand Down Expand Up @@ -507,7 +509,7 @@ func (s *sink) handleReqClientErr(req *produceRequest, err error) {
}
}

func (s *sink) handleReqResp(req *produceRequest, resp kmsg.Response, err error) {
func (s *sink) handleReqResp(br *broker, req *produceRequest, resp kmsg.Response, err error) {
if err != nil {
s.handleReqClientErr(req, err)
return
Expand All @@ -527,6 +529,25 @@ func (s *sink) handleReqResp(req *produceRequest, resp kmsg.Response, err error)
}()
}

// We defer updating the produce batch metrics in a goroutine; anything
// that we count as not-written (not the first batch, error) is removed
// from metrics before we return.
defer func() {
if len(req.metrics) > 0 {
s.cl.cfg.hooks.each(func(h Hook) {
if h, ok := h.(HookProduceBatchWritten); ok {
go func() {
for topic, partitions := range req.metrics {
for partition, metrics := range partitions { // I heard you liked tabs?
h.OnProduceBatchWritten(br.meta, topic, partition, metrics)
}
}
}()
}
})
}
}()

// If we have no acks, we will have no response. The following block is
// basically an extremely condensed version of everything that follows.
// We *do* retry on error even with no acks, because an error would
Expand Down Expand Up @@ -569,23 +590,26 @@ func (s *sink) handleReqResp(req *produceRequest, resp kmsg.Response, err error)
partitions, ok := req.batches[topic]
if !ok {
s.cl.cfg.logger.Log(LogLevelError, "Kafka erroneously replied with topic in produce request that we did not produce to", "broker", s.nodeID, "topic", topic)
delete(req.metrics, topic)
continue // should not hit this
}

if debug {
fmt.Fprintf(b, "%s[", topic)
}

tmetrics := req.metrics[topic]
for _, rPartition := range rTopic.Partitions {
partition := rPartition.Partition
batch, ok := partitions[partition]
if !ok {
s.cl.cfg.logger.Log(LogLevelError, "Kafka erroneously replied with partition in produce request that we did not produce to", "broker", s.nodeID, "topic", rTopic.Topic, "partition", partition)
delete(tmetrics, partition)
continue // should not hit this
}
delete(partitions, partition)

retry := s.handleReqRespBatch(
retry, didProduce := s.handleReqRespBatch(
b,
topic,
partition,
Expand All @@ -598,6 +622,9 @@ func (s *sink) handleReqResp(req *produceRequest, resp kmsg.Response, err error)
if retry {
reqRetry.addSeqBatch(topic, partition, batch)
}
if !didProduce {
delete(tmetrics, partition)
}
}

if debug {
Expand Down Expand Up @@ -630,7 +657,7 @@ func (s *sink) handleReqRespBatch(
producerEpoch int16,
baseOffset int64,
errorCode int16,
) (retry bool) {
) (retry, didProduce bool) {
batch.owner.mu.Lock()
defer batch.owner.mu.Unlock()

Expand Down Expand Up @@ -660,7 +687,7 @@ func (s *sink) handleReqRespBatch(
}
}
}
return false
return false, false
}

// Since we have received a response and we are the first batch, we can
Expand All @@ -676,7 +703,7 @@ func (s *sink) handleReqRespBatch(
if debug {
fmt.Fprintf(b, "retrying@%d,%d(%s)}, ", baseOffset, nrec, err)
}
return true
return true, false

case err == kerr.OutOfOrderSequenceNumber,
err == kerr.UnknownProducerID,
Expand Down Expand Up @@ -732,7 +759,7 @@ func (s *sink) handleReqRespBatch(
if debug {
fmt.Fprintf(b, "fatal@%d,%d(%s)}, ", baseOffset, len(batch.records), err)
}
return false
return false, false
}
if s.cl.cfg.onDataLoss != nil {
s.cl.cfg.onDataLoss(topic, partition)
Expand Down Expand Up @@ -765,7 +792,7 @@ func (s *sink) handleReqRespBatch(
if debug {
fmt.Fprintf(b, "resetting@%d,%d(%s)}, ", baseOffset, len(batch.records), err)
}
return true
return true, false

case err == kerr.DuplicateSequenceNumber: // ignorable, but we should not get
s.cl.cfg.logger.Log(LogLevelInfo, "received unexpected duplicate sequence number, ignoring and treating batch as successful",
Expand All @@ -787,6 +814,7 @@ func (s *sink) handleReqRespBatch(
)
}
s.cl.finishBatch(batch.recBatch, producerID, producerEpoch, partition, baseOffset, err)
didProduce = err == nil
if debug {
if err != nil {
fmt.Fprintf(b, "err@%d,%d(%s)}, ", baseOffset, len(batch.records), err)
Expand All @@ -795,7 +823,7 @@ func (s *sink) handleReqRespBatch(
}
}
}
return false // no retry
return false, didProduce // no retry
}

// finishBatch removes a batch from its owning record buffer and finishes all
Expand Down Expand Up @@ -1324,6 +1352,12 @@ type produceRequest struct {
producerEpoch int16
idempotent bool

// Initialized in AppendTo, metrics tracks uncompressed & compressed
// sizes (in byteS) of each batch.
//
// We use this in handleReqResp for the OnProduceHook.
metrics map[string]map[int32]ProduceBatchMetrics

compressor *compressor

// wireLength is initially the size of sending a produce request,
Expand Down Expand Up @@ -1637,6 +1671,8 @@ func (p *produceRequest) IsFlexible() bool { return p.version >= 9 }
func (p *produceRequest) AppendTo(dst []byte) []byte {
flexible := p.IsFlexible()

p.metrics = make(map[string]map[int32]ProduceBatchMetrics)

if p.version >= 3 {
if flexible {
dst = kbin.AppendCompactNullableString(dst, p.txnID)
Expand All @@ -1661,6 +1697,8 @@ func (p *produceRequest) AppendTo(dst []byte) []byte {
dst = kbin.AppendString(dst, topic)
dst = kbin.AppendArrayLen(dst, len(partitions))
}
tmetrics := make(map[int32]ProduceBatchMetrics)
p.metrics[topic] = tmetrics
for partition, batch := range partitions {
dst = kbin.AppendInt32(dst, partition)
batch.mu.Lock()
Expand All @@ -1673,12 +1711,14 @@ func (p *produceRequest) AppendTo(dst []byte) []byte {
batch.mu.Unlock()
continue
}
var pmetrics ProduceBatchMetrics
if p.version < 3 {
dst = batch.appendToAsMessageSet(dst, uint8(p.version), p.compressor)
dst, pmetrics = batch.appendToAsMessageSet(dst, uint8(p.version), p.compressor)
} else {
dst = batch.appendTo(dst, p.version, p.producerID, p.producerEpoch, p.idempotent, p.txnID != nil, p.compressor)
dst, pmetrics = batch.appendTo(dst, p.version, p.producerID, p.producerEpoch, p.idempotent, p.txnID != nil, p.compressor)
}
batch.mu.Unlock()
tmetrics[partition] = pmetrics
if flexible {
dst = append(dst, 0)
}
Expand Down Expand Up @@ -1708,7 +1748,7 @@ func (r seqRecBatch) appendTo(
idempotent bool,
transactional bool,
compressor *compressor,
) (dst []byte) { // named return so that our defer for flexible versions can modify it
) (dst []byte, m ProduceBatchMetrics) { // named return so that our defer for flexible versions can modify it
flexible := version >= 9
dst = in
nullableBytesLen := r.wireLength - 4 // NULLABLE_BYTES leading length, minus itself
Expand Down Expand Up @@ -1788,8 +1828,12 @@ func (r seqRecBatch) appendTo(
dst = pnr.appendTo(dst, int32(i))
}

toCompress := dst[recordsAt:]
m.NumRecords = len(r.records)
m.UncompressedBytes = len(toCompress)
m.CompressedBytes = m.UncompressedBytes

if compressor != nil {
toCompress := dst[recordsAt:]
w := sliceWriters.Get().(*sliceWriter)
defer sliceWriters.Put(w)

Expand All @@ -1800,6 +1844,8 @@ func (r seqRecBatch) appendTo(
// our compressed was shorter: copy over
copy(dst[recordsAt:], compressed)
dst = dst[:recordsAt+len(compressed)]
m.CompressedBytes = len(compressed)
m.CompressionType = uint8(codec)

// update the few record batch fields we already wrote
savings := int32(len(toCompress) - len(compressed))
Expand All @@ -1816,7 +1862,7 @@ func (r seqRecBatch) appendTo(

kbin.AppendInt32(dst[:crcStart], int32(crc32.Checksum(dst[crcStart+4:], crc32c)))

return dst
return dst, m
}

func (pnr promisedNumberedRecord) appendTo(dst []byte, offsetDelta int32) []byte {
Expand All @@ -1834,7 +1880,9 @@ func (pnr promisedNumberedRecord) appendTo(dst []byte, offsetDelta int32) []byte
return dst
}

func (r seqRecBatch) appendToAsMessageSet(dst []byte, version uint8, compressor *compressor) []byte {
func (r seqRecBatch) appendToAsMessageSet(dst []byte, version uint8, compressor *compressor) ([]byte, ProduceBatchMetrics) {
var m ProduceBatchMetrics

nullableBytesLenAt := len(dst)
dst = append(dst, 0, 0, 0, 0) // nullable bytes len
for i, pnr := range r.records {
Expand All @@ -1860,8 +1908,12 @@ func (r seqRecBatch) appendToAsMessageSet(dst []byte, version uint8, compressor
r.attrs |= 0b1000_0000
}

toCompress := dst[nullableBytesLenAt+4:] // skip nullable bytes leading prefix
m.NumRecords = len(r.records)
m.UncompressedBytes = len(toCompress)
m.CompressedBytes = m.UncompressedBytes

if compressor != nil {
toCompress := dst[nullableBytesLenAt+4:] // skip nullable bytes leading prefix
w := sliceWriters.Get().(*sliceWriter)
defer sliceWriters.Put(w)

Expand All @@ -1875,6 +1927,9 @@ func (r seqRecBatch) appendToAsMessageSet(dst []byte, version uint8, compressor
if compressed != nil &&
int(wrappedLength) < len(toCompress) {

m.CompressedBytes = int(wrappedLength)
m.CompressionType = uint8(codec)

r.attrs |= int16(codec)

dst = appendMessageTo(
Expand All @@ -1889,7 +1944,7 @@ func (r seqRecBatch) appendToAsMessageSet(dst []byte, version uint8, compressor
}

kbin.AppendInt32(dst[:nullableBytesLenAt], int32(len(dst[nullableBytesLenAt+4:])))
return dst
return dst, m
}

func appendMessageTo(
Expand Down

0 comments on commit 9810427

Please sign in to comment.