diff --git a/pkg/kgo/txn.go b/pkg/kgo/txn.go index 6ec80eaf..fb7dc15a 100644 --- a/pkg/kgo/txn.go +++ b/pkg/kgo/txn.go @@ -249,7 +249,7 @@ func (s *GroupTransactSession) End(ctx context.Context, commit TransactionEndTry postcommit := s.cl.UncommittedOffsets() s.failMu.Unlock() - var oldGeneration bool + var hasAbortableCommitErr bool var commitErr error var g *groupConsumer @@ -267,12 +267,16 @@ func (s *GroupTransactSession) End(ctx context.Context, commit TransactionEndTry for _, t := range resp.Topics { for _, p := range t.Partitions { - if err := kerr.ErrorForCode(p.ErrorCode); err != nil { - if err == kerr.IllegalGeneration { - oldGeneration = true - } else { - commitErrs = append(commitErrs, fmt.Sprintf("topic %s partition %d: %v", t.Topic, p.Partition, err)) - } + switch err := kerr.ErrorForCode(p.ErrorCode); err { + case nil: + case kerr.IllegalGeneration, // rebalance begun & completed before we committed + kerr.RebalanceInProgress, // in rebalance, abort & retry later + kerr.CoordinatorNotAvailable, // req failed too many times (same for next two) + kerr.CoordinatorLoadInProgress, + kerr.NotCoordinator: + hasAbortableCommitErr = true + default: + commitErrs = append(commitErrs, fmt.Sprintf("topic %s partition %d: %v", t.Topic, p.Partition, err)) } } } @@ -316,7 +320,7 @@ func (s *GroupTransactSession) End(ctx context.Context, commit TransactionEndTry s.failMu.Lock() defer s.failMu.Unlock() - tryCommit := !s.failed() && commitErr == nil && !oldGeneration && okHeartbeat + tryCommit := !s.failed() && commitErr == nil && !hasAbortableCommitErr && okHeartbeat willTryCommit := wantCommit && tryCommit s.cl.cfg.logger.Log(LogLevelInfo, "transaction session ending",