From 8edf93461fc16de8cd949b0de49f455b36cd1e33 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Sat, 27 Nov 2021 11:50:04 -0700 Subject: [PATCH] txn: allow more commit errors to just trigger abort Some commit errors, while rare, are perfectly normal and should trigger an EndTxn abort rather than a hard fatal client error. We already handled this case for IllegalGeneration, which would occur if our commit finished after a rebalance. We also now handle RebalanceInProgress (commit finishes *during* a rebalance), as well as all errors that result from the request repeatedly failing. --- pkg/kgo/txn.go | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/pkg/kgo/txn.go b/pkg/kgo/txn.go index 6ec80eaf..fb7dc15a 100644 --- a/pkg/kgo/txn.go +++ b/pkg/kgo/txn.go @@ -249,7 +249,7 @@ func (s *GroupTransactSession) End(ctx context.Context, commit TransactionEndTry postcommit := s.cl.UncommittedOffsets() s.failMu.Unlock() - var oldGeneration bool + var hasAbortableCommitErr bool var commitErr error var g *groupConsumer @@ -267,12 +267,16 @@ func (s *GroupTransactSession) End(ctx context.Context, commit TransactionEndTry for _, t := range resp.Topics { for _, p := range t.Partitions { - if err := kerr.ErrorForCode(p.ErrorCode); err != nil { - if err == kerr.IllegalGeneration { - oldGeneration = true - } else { - commitErrs = append(commitErrs, fmt.Sprintf("topic %s partition %d: %v", t.Topic, p.Partition, err)) - } + switch err := kerr.ErrorForCode(p.ErrorCode); err { + case nil: + case kerr.IllegalGeneration, // rebalance begun & completed before we committed + kerr.RebalanceInProgress, // in rebalance, abort & retry later + kerr.CoordinatorNotAvailable, // req failed too many times (same for next two) + kerr.CoordinatorLoadInProgress, + kerr.NotCoordinator: + hasAbortableCommitErr = true + default: + commitErrs = append(commitErrs, fmt.Sprintf("topic %s partition %d: %v", t.Topic, p.Partition, err)) } } } @@ -316,7 +320,7 @@ func (s *GroupTransactSession) End(ctx context.Context, commit TransactionEndTry s.failMu.Lock() defer s.failMu.Unlock() - tryCommit := !s.failed() && commitErr == nil && !oldGeneration && okHeartbeat + tryCommit := !s.failed() && commitErr == nil && !hasAbortableCommitErr && okHeartbeat willTryCommit := wantCommit && tryCommit s.cl.cfg.logger.Log(LogLevelInfo, "transaction session ending",