Skip to content

Commit

Permalink
sink: fix decInflight double decrement bug
Browse files Browse the repository at this point in the history
a3d9f73 accidentally introduced a
double decrement: we would handle a batch (first decrement), and if we
needed to retry the batch, we would add it to be retried and decrement
again. This would result in a negative inflight, and when the partition
migrated to a different sink, it would never start draining because the
inflight count would never hit 0 and thus it would always be "inflight"
on the old sink. The panic that I added to catch a negative inflight was
three lines too low; the conditional just before the panic would return
on non-zero.

We now move the panic up to ensure it will be hit just in case, but more
importantly we simplify the inflight decrementing. This commit adds a
new batchesFlat field to the produceRequest. batchesFlat is not modified
after being issued, so now we can just range over all the batches that
were in the request when handling the response and decrement them all
inflight in one spot.

This replaces three places we were decrementing inflight with one.
  • Loading branch information
twmb committed Aug 20, 2021
1 parent 2cf62e2 commit 029f5e3
Showing 1 changed file with 27 additions and 12 deletions.
39 changes: 27 additions & 12 deletions pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,9 @@ func (s *sink) doTxnReq(
})
}

// Removing a batch from the transaction means we will not be issuing it
// inflight, and that it was not added to the txn and that we need to reset the
// drain index.
func (b seqRecBatch) removeFromTxn() {
b.owner.addedToTxn = false
b.owner.resetBatchDrainIdx()
Expand Down Expand Up @@ -529,7 +532,6 @@ func (s *sink) handleReqRespNoack(b *bytes.Buffer, debug bool, req *produceReque
} else if debug {
fmt.Fprintf(b, "%d{skipped}, ", partition)
}
batch.decInflight()
batch.owner.mu.Unlock()
}
if debug {
Expand All @@ -542,6 +544,11 @@ func (s *sink) handleReqRespNoack(b *bytes.Buffer, debug bool, req *produceReque
}

func (s *sink) handleReqResp(br *broker, req *produceRequest, resp kmsg.Response, err error) {
// The last thing we do before returning is to decrement the inflight
// count on all used batches. This func uses batchesFlat, meaning map
// deletions below do not affect us.
defer req.decBatchesInflight()

if err != nil {
s.handleReqClientErr(req, err)
return
Expand Down Expand Up @@ -644,7 +651,6 @@ func (s *sink) handleReqRespBatch(
) (retry, didProduce bool) {
batch.owner.mu.Lock()
defer batch.owner.mu.Unlock()
defer batch.decInflight()

nrec := len(batch.records)

Expand Down Expand Up @@ -873,8 +879,6 @@ func (s *sink) handleRetryBatches(
) {
var needsMetaUpdate bool
retry.eachOwnerLocked(func(batch seqRecBatch) {
defer batch.decInflight()

if !batch.isOwnersFirstBatch() {
return
}
Expand Down Expand Up @@ -1363,18 +1367,18 @@ func (b *recBatch) isTimedOut(limit time.Duration) bool {
//
// This is always called in the produce request path, not anywhere else (i.e.
// not failAllRecords). We want inflight decrementing to be the last thing that
// happens always for every request It does not matter if the records were
// happens always for every request. It does not matter if the records were
// independently failed: from the request issuing perspective, the batch is
// still inflight.
func (b *recBatch) decInflight() {
recBuf := b.owner
recBuf.inflight--
if recBuf.inflight != 0 {
return
}
if recBuf.inflight < 0 {
panic("record buffer went negative inflight!")
}
if recBuf.inflight != 0 {
return
}
oldSink := recBuf.inflightOnSink
recBuf.inflightOnSink = nil
if oldSink != recBuf.sink && recBuf.batchDrainIdx != len(recBuf.batches) {
Expand All @@ -1395,10 +1399,11 @@ type produceRequest struct {

backoffSeq uint32

txnID *string
acks int16
timeout int32
batches seqRecBatches
txnID *string
acks int16
timeout int32
batches seqRecBatches
batchesFlat []*recBatch

producerID int64
producerEpoch int16
Expand Down Expand Up @@ -1497,9 +1502,19 @@ func (r *produceRequest) tryAddBatch(produceVersion int32, recBuf *recBuf, batch
recBuf.seq,
batch,
)
r.batchesFlat = append(r.batchesFlat, batch)
return true
}

// decBatchesInflight is called at the end of handling a req response.
func (r *produceRequest) decBatchesInflight() {
for _, batch := range r.batchesFlat {
batch.owner.mu.Lock()
batch.decInflight()
batch.owner.mu.Unlock()
}
}

// seqRecBatch: a recBatch with a sequence number.
type seqRecBatch struct {
seq int32
Expand Down

0 comments on commit 029f5e3

Please sign in to comment.