diff --git a/pkg/kgo/sink.go b/pkg/kgo/sink.go index d2a3d023..65f217e1 100644 --- a/pkg/kgo/sink.go +++ b/pkg/kgo/sink.go @@ -78,7 +78,6 @@ func (s *sink) createReq(id int64, epoch int16) (*produceRequest, *kmsg.AddParti producerID: id, producerEpoch: epoch, - idempotent: s.cl.idempotent(), compressor: s.cl.compressor, @@ -479,10 +478,10 @@ func (s *sink) requeueUnattemptedReq(req *produceRequest, err error) { // outside of a small window during the store, but some pages in the Kafka // confluence basically show that more than two in flight has marginal benefit // anyway (although that may be due to their Java API). -func (s *sink) firstRespCheck(version int16) { +func (s *sink) firstRespCheck(idempotent bool, version int16) { if s.produceVersion < 0 { // this is the only place this can be checked non-atomically atomic.StoreInt32(&s.produceVersion, int32(version)) - if version >= 4 { + if idempotent && version >= 4 { s.inflightSem.Store(make(chan struct{}, 4)) } } @@ -514,7 +513,7 @@ func (s *sink) handleReqResp(br *broker, req *produceRequest, resp kmsg.Response s.handleReqClientErr(req, err) return } - s.firstRespCheck(req.version) + s.firstRespCheck(req.idempotent(), req.version) atomic.StoreUint32(&s.consecutiveFailures, 0) var b *bytes.Buffer @@ -1350,7 +1349,6 @@ type produceRequest struct { producerID int64 producerEpoch int16 - idempotent bool // Initialized in AppendTo, metrics tracks uncompressed & compressed // sizes (in byteS) of each batch. @@ -1368,6 +1366,8 @@ type produceRequest struct { wireLengthLimit int32 } +func (r *produceRequest) idempotent() bool { return r.producerID >= 0 } + func (r *produceRequest) tryAddBatch(produceVersion int32, recBuf *recBuf, batch *recBatch) bool { batchWireLength, flexible := batch.wireLengthForProduceVersion(produceVersion) batchWireLength += 4 // int32 partition prefix @@ -1396,7 +1396,7 @@ func (r *produceRequest) tryAddBatch(produceVersion int32, recBuf *recBuf, batch } if recBuf.batches[0] == batch { - if !r.idempotent || batch.canFailFromLoadErrs { + if !r.idempotent() || batch.canFailFromLoadErrs { if err := batch.maybeFailErr(&batch.owner.cl.cfg); err != nil { recBuf.failAllRecords(err) return false @@ -1715,7 +1715,7 @@ func (p *produceRequest) AppendTo(dst []byte) []byte { if p.version < 3 { dst, pmetrics = batch.appendToAsMessageSet(dst, uint8(p.version), p.compressor) } else { - dst, pmetrics = batch.appendTo(dst, p.version, p.producerID, p.producerEpoch, p.idempotent, p.txnID != nil, p.compressor) + dst, pmetrics = batch.appendTo(dst, p.version, p.producerID, p.producerEpoch, p.txnID != nil, p.compressor) } batch.mu.Unlock() tmetrics[partition] = pmetrics @@ -1746,7 +1746,6 @@ func (r seqRecBatch) appendTo( version int16, producerID int64, producerEpoch int16, - idempotent bool, transactional bool, compressor *compressor, ) (dst []byte, m ProduceBatchMetrics) { // named return so that our defer for flexible versions can modify it @@ -1816,7 +1815,7 @@ func (r seqRecBatch) appendTo( dst = kbin.AppendInt64(dst, r.firstTimestamp+int64(lastRecord.timestampDelta)) seq := r.seq - if !idempotent { // producerID and producerEpoch are already -1 if idempotent (due to producerID() itself returning -1) + if producerID < 0 { // a negative producer ID means we are not using idempotence seq = 0 } dst = kbin.AppendInt64(dst, producerID)