From f29fb7f083ed4f790e506428dfef4e63256f9e4a Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Tue, 10 Aug 2021 13:10:34 -0600 Subject: [PATCH] sink: fix out-of-order response handling across partition rebalances If a recBuf moves from one sink to another while requests are inflight, there is a pathological case that allows responses to finish out of order. A lot of assumptions in the code exist such that all requests will be processed in order: this is the guarantee of handleSeqResps. However, handleSeqResps only works per sink. If a recBuf moves to a different sink while requests are inflight, theoretically it could create a request, issue it, receive a response, and handle that response before the active inflight responses are received / handled on the original sink. In the worst case, this could lead to panics: A: has two requests in flight A: first request fails, reset the drain index to 0 metadata update moves partition to sink B B: issues and finishes first request B: issues second request. This is duplicate to A's second request. A: second request finally fails, resets the drain index to 0 B: second request finishes successfully, decrements batch drain index The next time the recBuf is used, the batchDrainIdx will be negative and the client will panic. It is difficult to imagine a scenario that has this same failure case without multiple requests inflight. Regardless, the code now does not start issuing requests on the new sink until all requests on the old have finished. This moves closer to the guarantee that all requests are handled in order: we remove the cross-sink failure case. --- pkg/kgo/sink.go | 117 ++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 93 insertions(+), 24 deletions(-) diff --git a/pkg/kgo/sink.go b/pkg/kgo/sink.go index 65f217e1..43eeeb88 100644 --- a/pkg/kgo/sink.go +++ b/pkg/kgo/sink.go @@ -101,7 +101,7 @@ func (s *sink) createReq(id int64, epoch int16) (*produceRequest, *kmsg.AddParti recBufsIdx = (recBufsIdx + 1) % len(s.recBufs) recBuf.mu.Lock() - if recBuf.failing || len(recBuf.batches) == recBuf.batchDrainIdx { + if recBuf.failing || len(recBuf.batches) == recBuf.batchDrainIdx || recBuf.inflightOnSink != nil && recBuf.inflightOnSink != s { recBuf.mu.Unlock() continue } @@ -113,6 +113,9 @@ func (s *sink) createReq(id int64, epoch int16) (*produceRequest, *kmsg.AddParti continue } + recBuf.inflightOnSink = s + recBuf.inflight++ + recBuf.batchDrainIdx++ recBuf.seq += int32(len(batch.records)) moreToDrain = moreToDrain || recBuf.tryStopLingerForDraining() @@ -394,7 +397,21 @@ more: func (s *sink) doTxnReq( req *produceRequest, txnReq *kmsg.AddPartitionsToTxnRequest, -) error { +) (err error) { + // If we return an unretriable error, then we have to reset everything + // to not be in the transaction and begin draining at the start. + // + // These batches must be the first in their recBuf, because we would + // not be trying to add them to a partition if they were not. + defer func() { + if err != nil { + req.batches.eachOwnerLocked(func(batch seqRecBatch) { + batch.owner.addedToTxn = false + batch.owner.resetBatchDrainIdx() + batch.decInflight() + }) + } + }() return s.cl.doWithConcurrentTransactions("AddPartitionsToTxn", func() error { return s.issueTxnReq(req, txnReq) }) @@ -430,16 +447,12 @@ func (s *sink) issueTxnReq( continue } - // If we did not add this partition to the txn, then - // this must be the first batch in the recBuf, because - // this is the first time seeing it, which is why we - // are trying to add it to the txn. - // - // Thus, we simply set that this is **not** added, and - // we reset the drain index to re-try. + // We are stripping this retriable-err batch from the request, + // so we must reset that it has been added to the txn. batch.owner.mu.Lock() batch.owner.addedToTxn = false batch.owner.resetBatchDrainIdx() + batch.decInflight() batch.owner.mu.Unlock() delete(topicBatches, partition.Partition) @@ -569,6 +582,7 @@ func (s *sink) handleReqResp(br *broker, req *produceRequest, resp kmsg.Response } else if debug { fmt.Fprintf(b, "%d{skipped}, ", partition) } + batch.decInflight() batch.owner.mu.Unlock() } if debug { @@ -659,6 +673,7 @@ func (s *sink) handleReqRespBatch( ) (retry, didProduce bool) { batch.owner.mu.Lock() defer batch.owner.mu.Unlock() + defer batch.decInflight() nrec := len(batch.records) @@ -970,6 +985,19 @@ type recBuf struct { // This exists to aid in removing the buffer from the sink. recBufsIdx int + // A concurrent metadata update can move a recBuf from one sink to + // another wile requests are inflight on the original sink. We do not + // want to allow new requests to start on the new sink until they all + // finish on the old, because with some pathological request order + // finishing, we would allow requests to finish out of order: + // handleSeqResps works per sink, not across sinks. + inflightOnSink *sink + // Inflight tracks the number of requests inflight using batches from + // this recBuf. Every time this hits zero, if the batchDrainIdx is not + // at the end, we clear inflightOnSink and trigger the *current* sink + // to drain. + inflight uint8 + topicPartitionData // updated in metadata migrateProductionTo (same spot sink is updated) // seq is used for the seq in each record batch. It is incremented when @@ -1136,7 +1164,7 @@ func (recBuf *recBuf) bumpRepeatedLoadErr(err error) { batch0 := recBuf.batches[0] batch0.tries++ failErr := batch0.maybeFailErr(&recBuf.cl.cfg) - if (!recBuf.cl.idempotent() || batch0.canFailFromLoadErrs) && (!kerr.IsRetriable(err) || failErr != nil) { + if (!recBuf.cl.idempotent() || batch0.canFailFromLoadErrs) && (failErr != nil || !isRetriableBrokerErr(err) && !kerr.IsRetriable(err)) { recBuf.failAllRecords(err) } } @@ -1329,6 +1357,30 @@ func (b *recBatch) isTimedOut(limit time.Duration) bool { return time.Since(b.records[0].Timestamp) > limit } +// Decrements the inflight count for this batch. +// +// If the inflight count hits zero, this potentially re-triggers a drain on the +// *current* sink. A concurrent metadata update could have moved the recBuf to +// a different sink; that sink will not drain this recBuf until all requests on +// the old sink are finished. +// +// This is always called in the produce request path, not anywhere else (i.e. +// not failAllRecords). We want inflight decrementing to be the last thing that +// happens always for every request It does not matter if the records were +// independently failed: from the request issuing perspective, the batch is +// still inflight. +func (b *recBatch) decInflight() { + recBuf := b.owner + recBuf.inflight-- + if recBuf.inflight != 0 { + return + } + recBuf.inflightOnSink = nil + if recBuf.batchDrainIdx != len(recBuf.batches) { + recBuf.sink.maybeDrain() + } +} + //////////////////// // produceRequest // //////////////////// @@ -1453,30 +1505,47 @@ func (rbs *seqRecBatches) addSeqBatch(topic string, part int32, batch seqRecBatc topicBatches[part] = batch } -// Resets the drain index for any batch that is the first in its record buffer. +// Resets the drain index for any batch that is the first in its record buffer, +// as well as decrements the inflight for all batches always. // // If idempotency is disabled, if a batch is timed out or hit the retry limit, // we fail it and anything after it. func (rbs seqRecBatches) tryResetFailingBatchesWith(cfg *cfg, canFail bool, fn func(seqRecBatch)) { + rbs.eachOwnerLocked(func(batch seqRecBatch) { + defer batch.decInflight() + + if !batch.isOwnersFirstBatch() { + return + } + + if canFail || cfg.disableIdempotency { + if err := batch.maybeFailErr(cfg); err != nil { + batch.owner.failAllRecords(err) + return + } + } + + batch.owner.resetBatchDrainIdx() + fn(batch) + }) +} + +func (rbs seqRecBatches) each(fn func(seqRecBatch)) { for _, partitions := range rbs { for _, batch := range partitions { - batch.owner.mu.Lock() - if batch.isOwnersFirstBatch() { - if canFail || cfg.disableIdempotency { - if err := batch.maybeFailErr(cfg); err != nil { - batch.owner.failAllRecords(err) - batch.owner.mu.Unlock() - continue - } - } - batch.owner.resetBatchDrainIdx() - fn(batch) - } - batch.owner.mu.Unlock() + fn(batch) } } } +func (rbs seqRecBatches) eachOwnerLocked(fn func(seqRecBatch)) { + rbs.each(func(batch seqRecBatch) { + batch.owner.mu.Lock() + defer batch.owner.mu.Unlock() + fn(batch) + }) +} + ////////////// // COUNTING // - this section is all about counting how bytes lay out on the wire //////////////