Skip to content

Commit

Permalink
sink: small AddPartitionsToTxn improvements
Browse files Browse the repository at this point in the history
If batches were stripped, we need to ensure we keep draining (because we
know there are records to produce: what we stripped). However, if we
stripped everything, we should back off to trigger a metadata update and
avoid spin-looping.
  • Loading branch information
twmb committed Mar 21, 2022
1 parent 65ca0bd commit bd1d43d
Showing 1 changed file with 26 additions and 9 deletions.
35 changes: 26 additions & 9 deletions pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,12 +313,13 @@ func (s *sink) produce(sem <-chan struct{}) bool {
// here, all buffered records must fail.
// We do not need to clear the addedToTxn flag for any recBuf
// it was set on, since producer id recovery resets the flag.
if err := s.doTxnReq(req, txnReq); err != nil {
batchesStripped, err := s.doTxnReq(req, txnReq)
if err != nil {
switch {
case isRetriableBrokerErr(err) || isDialErr(err):
s.cl.bumpRepeatedLoadErr(err)
s.cl.cfg.logger.Log(LogLevelWarn, "unable to AddPartitionsToTxn due to retriable broker err, bumping client's buffered record load errors by 1 and retrying", "err", err)
return moreToDrain || len(req.batches) > 0
return moreToDrain || len(req.batches) > 0 // nothing stripped if request-issuing error
default:
// Note that err can be InvalidProducerEpoch, which is
// potentially recoverable in EndTransaction.
Expand All @@ -331,6 +332,17 @@ func (s *sink) produce(sem <-chan struct{}) bool {
}
return false
}

// If we stripped everything, ensure we backoff to force a
// metadata load. If not everything was stripped, we issue our
// request and ensure we will retry a producing until
// everything is stripped (and we eventually back off).
if batchesStripped {
moreToDrain = true
if len(req.batches) == 0 {
s.maybeTriggerBackoff(s.backoffSeq)
}
}
}

if len(req.batches) == 0 { // txn req could have removed some partitions to retry later (unknown topic, etc.)
Expand Down Expand Up @@ -398,7 +410,7 @@ start:
func (s *sink) doTxnReq(
req *produceRequest,
txnReq *kmsg.AddPartitionsToTxnRequest,
) (err error) {
) (stripped bool, err error) {
// If we return an unretriable error, then we have to reset everything
// to not be in the transaction and begin draining at the start.
//
Expand All @@ -409,9 +421,11 @@ func (s *sink) doTxnReq(
req.batches.eachOwnerLocked(seqRecBatch.removeFromTxn)
}
}()
return s.cl.doWithConcurrentTransactions("AddPartitionsToTxn", func() error {
return s.issueTxnReq(req, txnReq)
err = s.cl.doWithConcurrentTransactions("AddPartitionsToTxn", func() error {
stripped, err = s.issueTxnReq(req, txnReq)
return err
})
return stripped, err
}

// Removing a batch from the transaction means we will not be issuing it
Expand All @@ -426,10 +440,10 @@ func (b *recBatch) removeFromTxn() {
func (s *sink) issueTxnReq(
req *produceRequest,
txnReq *kmsg.AddPartitionsToTxnRequest,
) error {
) (stripped bool, fatalErr error) {
resp, err := txnReq.RequestWith(s.cl.ctx, s.cl)
if err != nil {
return err
return false, err
}

for _, topic := range resp.Topics {
Expand All @@ -444,7 +458,8 @@ func (s *sink) issueTxnReq(
// if any partition is unauthorized _or_ does not exist. We simply remove
// unattempted partitions and treat them as retriable.
if !kerr.IsRetriable(err) && !errors.Is(err, kerr.OperationNotAttempted) {
return err // auth err, etc.
fatalErr = err // auth err, etc
continue
}

batch, ok := topicBatches[partition.Partition]
Expand All @@ -459,14 +474,16 @@ func (s *sink) issueTxnReq(
batch.removeFromTxn()
batch.owner.mu.Unlock()

stripped = true

delete(topicBatches, partition.Partition)
}
if len(topicBatches) == 0 {
delete(req.batches, topic.Topic)
}
}
}
return nil
return stripped, fatalErr
}

// firstRespCheck is effectively a sink.Once. On the first response, if the
Expand Down

0 comments on commit bd1d43d

Please sign in to comment.