From 8770662993ad63bc31fb0e41a7f201d520b7e98f Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Wed, 14 Jul 2021 22:14:16 -0600 Subject: [PATCH] producing: only opt in to more than 1req if idempotent Previously, we relied on the produce response being v4 or higher to say "ok we are talking to kafka v1+ and we can always be idempotent!". We actually only want to opt in to more requests only if we ourselves are idempotent. To detect this, we only opt up if the request was issued with a producer ID AND the response is v4 or higher. Ensuring we issued with a producer ID ensures that we are idempotent. This also simplifies some logic around where the idempotent fields are / how they are used. --- pkg/kgo/sink.go | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/pkg/kgo/sink.go b/pkg/kgo/sink.go index d2a3d023..65f217e1 100644 --- a/pkg/kgo/sink.go +++ b/pkg/kgo/sink.go @@ -78,7 +78,6 @@ func (s *sink) createReq(id int64, epoch int16) (*produceRequest, *kmsg.AddParti producerID: id, producerEpoch: epoch, - idempotent: s.cl.idempotent(), compressor: s.cl.compressor, @@ -479,10 +478,10 @@ func (s *sink) requeueUnattemptedReq(req *produceRequest, err error) { // outside of a small window during the store, but some pages in the Kafka // confluence basically show that more than two in flight has marginal benefit // anyway (although that may be due to their Java API). -func (s *sink) firstRespCheck(version int16) { +func (s *sink) firstRespCheck(idempotent bool, version int16) { if s.produceVersion < 0 { // this is the only place this can be checked non-atomically atomic.StoreInt32(&s.produceVersion, int32(version)) - if version >= 4 { + if idempotent && version >= 4 { s.inflightSem.Store(make(chan struct{}, 4)) } } @@ -514,7 +513,7 @@ func (s *sink) handleReqResp(br *broker, req *produceRequest, resp kmsg.Response s.handleReqClientErr(req, err) return } - s.firstRespCheck(req.version) + s.firstRespCheck(req.idempotent(), req.version) atomic.StoreUint32(&s.consecutiveFailures, 0) var b *bytes.Buffer @@ -1350,7 +1349,6 @@ type produceRequest struct { producerID int64 producerEpoch int16 - idempotent bool // Initialized in AppendTo, metrics tracks uncompressed & compressed // sizes (in byteS) of each batch. @@ -1368,6 +1366,8 @@ type produceRequest struct { wireLengthLimit int32 } +func (r *produceRequest) idempotent() bool { return r.producerID >= 0 } + func (r *produceRequest) tryAddBatch(produceVersion int32, recBuf *recBuf, batch *recBatch) bool { batchWireLength, flexible := batch.wireLengthForProduceVersion(produceVersion) batchWireLength += 4 // int32 partition prefix @@ -1396,7 +1396,7 @@ func (r *produceRequest) tryAddBatch(produceVersion int32, recBuf *recBuf, batch } if recBuf.batches[0] == batch { - if !r.idempotent || batch.canFailFromLoadErrs { + if !r.idempotent() || batch.canFailFromLoadErrs { if err := batch.maybeFailErr(&batch.owner.cl.cfg); err != nil { recBuf.failAllRecords(err) return false @@ -1715,7 +1715,7 @@ func (p *produceRequest) AppendTo(dst []byte) []byte { if p.version < 3 { dst, pmetrics = batch.appendToAsMessageSet(dst, uint8(p.version), p.compressor) } else { - dst, pmetrics = batch.appendTo(dst, p.version, p.producerID, p.producerEpoch, p.idempotent, p.txnID != nil, p.compressor) + dst, pmetrics = batch.appendTo(dst, p.version, p.producerID, p.producerEpoch, p.txnID != nil, p.compressor) } batch.mu.Unlock() tmetrics[partition] = pmetrics @@ -1746,7 +1746,6 @@ func (r seqRecBatch) appendTo( version int16, producerID int64, producerEpoch int16, - idempotent bool, transactional bool, compressor *compressor, ) (dst []byte, m ProduceBatchMetrics) { // named return so that our defer for flexible versions can modify it @@ -1816,7 +1815,7 @@ func (r seqRecBatch) appendTo( dst = kbin.AppendInt64(dst, r.firstTimestamp+int64(lastRecord.timestampDelta)) seq := r.seq - if !idempotent { // producerID and producerEpoch are already -1 if idempotent (due to producerID() itself returning -1) + if producerID < 0 { // a negative producer ID means we are not using idempotence seq = 0 } dst = kbin.AppendInt64(dst, producerID)