From 029f5e38eb16d18697729e85560524691dda1252 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Fri, 20 Aug 2021 13:27:22 -0600 Subject: [PATCH] sink: fix decInflight double decrement bug a3d9f7363e45ea15423dd11fba7ca400bfea1dc7 accidentally introduced a double decrement: we would handle a batch (first decrement), and if we needed to retry the batch, we would add it to be retried and decrement again. This would result in a negative inflight, and when the partition migrated to a different sink, it would never start draining because the inflight count would never hit 0 and thus it would always be "inflight" on the old sink. The panic that I added to catch a negative inflight was three lines too low; the conditional just before the panic would return on non-zero. We now move the panic up to ensure it will be hit just in case, but more importantly we simplify the inflight decrementing. This commit adds a new batchesFlat field to the produceRequest. batchesFlat is not modified after being issued, so now we can just range over all the batches that were in the request when handling the response and decrement them all inflight in one spot. This replaces three places we were decrementing inflight with one. --- pkg/kgo/sink.go | 39 +++++++++++++++++++++++++++------------ 1 file changed, 27 insertions(+), 12 deletions(-) diff --git a/pkg/kgo/sink.go b/pkg/kgo/sink.go index 4643250a..57403573 100644 --- a/pkg/kgo/sink.go +++ b/pkg/kgo/sink.go @@ -413,6 +413,9 @@ func (s *sink) doTxnReq( }) } +// Removing a batch from the transaction means we will not be issuing it +// inflight, and that it was not added to the txn and that we need to reset the +// drain index. func (b seqRecBatch) removeFromTxn() { b.owner.addedToTxn = false b.owner.resetBatchDrainIdx() @@ -529,7 +532,6 @@ func (s *sink) handleReqRespNoack(b *bytes.Buffer, debug bool, req *produceReque } else if debug { fmt.Fprintf(b, "%d{skipped}, ", partition) } - batch.decInflight() batch.owner.mu.Unlock() } if debug { @@ -542,6 +544,11 @@ func (s *sink) handleReqRespNoack(b *bytes.Buffer, debug bool, req *produceReque } func (s *sink) handleReqResp(br *broker, req *produceRequest, resp kmsg.Response, err error) { + // The last thing we do before returning is to decrement the inflight + // count on all used batches. This func uses batchesFlat, meaning map + // deletions below do not affect us. + defer req.decBatchesInflight() + if err != nil { s.handleReqClientErr(req, err) return @@ -644,7 +651,6 @@ func (s *sink) handleReqRespBatch( ) (retry, didProduce bool) { batch.owner.mu.Lock() defer batch.owner.mu.Unlock() - defer batch.decInflight() nrec := len(batch.records) @@ -873,8 +879,6 @@ func (s *sink) handleRetryBatches( ) { var needsMetaUpdate bool retry.eachOwnerLocked(func(batch seqRecBatch) { - defer batch.decInflight() - if !batch.isOwnersFirstBatch() { return } @@ -1363,18 +1367,18 @@ func (b *recBatch) isTimedOut(limit time.Duration) bool { // // This is always called in the produce request path, not anywhere else (i.e. // not failAllRecords). We want inflight decrementing to be the last thing that -// happens always for every request It does not matter if the records were +// happens always for every request. It does not matter if the records were // independently failed: from the request issuing perspective, the batch is // still inflight. func (b *recBatch) decInflight() { recBuf := b.owner recBuf.inflight-- - if recBuf.inflight != 0 { - return - } if recBuf.inflight < 0 { panic("record buffer went negative inflight!") } + if recBuf.inflight != 0 { + return + } oldSink := recBuf.inflightOnSink recBuf.inflightOnSink = nil if oldSink != recBuf.sink && recBuf.batchDrainIdx != len(recBuf.batches) { @@ -1395,10 +1399,11 @@ type produceRequest struct { backoffSeq uint32 - txnID *string - acks int16 - timeout int32 - batches seqRecBatches + txnID *string + acks int16 + timeout int32 + batches seqRecBatches + batchesFlat []*recBatch producerID int64 producerEpoch int16 @@ -1497,9 +1502,19 @@ func (r *produceRequest) tryAddBatch(produceVersion int32, recBuf *recBuf, batch recBuf.seq, batch, ) + r.batchesFlat = append(r.batchesFlat, batch) return true } +// decBatchesInflight is called at the end of handling a req response. +func (r *produceRequest) decBatchesInflight() { + for _, batch := range r.batchesFlat { + batch.owner.mu.Lock() + batch.decInflight() + batch.owner.mu.Unlock() + } +} + // seqRecBatch: a recBatch with a sequence number. type seqRecBatch struct { seq int32