From bd1d43d4b8d2d14905c4f425bfd864aab84ee422 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Mon, 21 Mar 2022 15:03:22 -0600 Subject: [PATCH] sink: small AddPartitionsToTxn improvements If batches were stripped, we need to ensure we keep draining (because we know there are records to produce: what we stripped). However, if we stripped everything, we should back off to trigger a metadata update and avoid spin-looping. --- pkg/kgo/sink.go | 35 ++++++++++++++++++++++++++--------- 1 file changed, 26 insertions(+), 9 deletions(-) diff --git a/pkg/kgo/sink.go b/pkg/kgo/sink.go index 0a685565..03aee1d7 100644 --- a/pkg/kgo/sink.go +++ b/pkg/kgo/sink.go @@ -313,12 +313,13 @@ func (s *sink) produce(sem <-chan struct{}) bool { // here, all buffered records must fail. // We do not need to clear the addedToTxn flag for any recBuf // it was set on, since producer id recovery resets the flag. - if err := s.doTxnReq(req, txnReq); err != nil { + batchesStripped, err := s.doTxnReq(req, txnReq) + if err != nil { switch { case isRetriableBrokerErr(err) || isDialErr(err): s.cl.bumpRepeatedLoadErr(err) s.cl.cfg.logger.Log(LogLevelWarn, "unable to AddPartitionsToTxn due to retriable broker err, bumping client's buffered record load errors by 1 and retrying", "err", err) - return moreToDrain || len(req.batches) > 0 + return moreToDrain || len(req.batches) > 0 // nothing stripped if request-issuing error default: // Note that err can be InvalidProducerEpoch, which is // potentially recoverable in EndTransaction. @@ -331,6 +332,17 @@ func (s *sink) produce(sem <-chan struct{}) bool { } return false } + + // If we stripped everything, ensure we backoff to force a + // metadata load. If not everything was stripped, we issue our + // request and ensure we will retry a producing until + // everything is stripped (and we eventually back off). + if batchesStripped { + moreToDrain = true + if len(req.batches) == 0 { + s.maybeTriggerBackoff(s.backoffSeq) + } + } } if len(req.batches) == 0 { // txn req could have removed some partitions to retry later (unknown topic, etc.) @@ -398,7 +410,7 @@ start: func (s *sink) doTxnReq( req *produceRequest, txnReq *kmsg.AddPartitionsToTxnRequest, -) (err error) { +) (stripped bool, err error) { // If we return an unretriable error, then we have to reset everything // to not be in the transaction and begin draining at the start. // @@ -409,9 +421,11 @@ func (s *sink) doTxnReq( req.batches.eachOwnerLocked(seqRecBatch.removeFromTxn) } }() - return s.cl.doWithConcurrentTransactions("AddPartitionsToTxn", func() error { - return s.issueTxnReq(req, txnReq) + err = s.cl.doWithConcurrentTransactions("AddPartitionsToTxn", func() error { + stripped, err = s.issueTxnReq(req, txnReq) + return err }) + return stripped, err } // Removing a batch from the transaction means we will not be issuing it @@ -426,10 +440,10 @@ func (b *recBatch) removeFromTxn() { func (s *sink) issueTxnReq( req *produceRequest, txnReq *kmsg.AddPartitionsToTxnRequest, -) error { +) (stripped bool, fatalErr error) { resp, err := txnReq.RequestWith(s.cl.ctx, s.cl) if err != nil { - return err + return false, err } for _, topic := range resp.Topics { @@ -444,7 +458,8 @@ func (s *sink) issueTxnReq( // if any partition is unauthorized _or_ does not exist. We simply remove // unattempted partitions and treat them as retriable. if !kerr.IsRetriable(err) && !errors.Is(err, kerr.OperationNotAttempted) { - return err // auth err, etc. + fatalErr = err // auth err, etc + continue } batch, ok := topicBatches[partition.Partition] @@ -459,6 +474,8 @@ func (s *sink) issueTxnReq( batch.removeFromTxn() batch.owner.mu.Unlock() + stripped = true + delete(topicBatches, partition.Partition) } if len(topicBatches) == 0 { @@ -466,7 +483,7 @@ func (s *sink) issueTxnReq( } } } - return nil + return stripped, fatalErr } // firstRespCheck is effectively a sink.Once. On the first response, if the