Skip to content

Commit

Permalink
txn: allow more commit errors to just trigger abort
Browse files Browse the repository at this point in the history
Some commit errors, while rare, are perfectly normal and should trigger
an EndTxn abort rather than a hard fatal client error.

We already handled this case for IllegalGeneration, which would occur if
our commit finished after a rebalance. We also now handle
RebalanceInProgress (commit finishes *during* a rebalance), as well as
all errors that result from the request repeatedly failing.
  • Loading branch information
twmb committed Nov 27, 2021
1 parent 03c58cb commit 8edf934
Showing 1 changed file with 12 additions and 8 deletions.
20 changes: 12 additions & 8 deletions pkg/kgo/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func (s *GroupTransactSession) End(ctx context.Context, commit TransactionEndTry
postcommit := s.cl.UncommittedOffsets()
s.failMu.Unlock()

var oldGeneration bool
var hasAbortableCommitErr bool
var commitErr error
var g *groupConsumer

Expand All @@ -267,12 +267,16 @@ func (s *GroupTransactSession) End(ctx context.Context, commit TransactionEndTry

for _, t := range resp.Topics {
for _, p := range t.Partitions {
if err := kerr.ErrorForCode(p.ErrorCode); err != nil {
if err == kerr.IllegalGeneration {
oldGeneration = true
} else {
commitErrs = append(commitErrs, fmt.Sprintf("topic %s partition %d: %v", t.Topic, p.Partition, err))
}
switch err := kerr.ErrorForCode(p.ErrorCode); err {
case nil:
case kerr.IllegalGeneration, // rebalance begun & completed before we committed
kerr.RebalanceInProgress, // in rebalance, abort & retry later
kerr.CoordinatorNotAvailable, // req failed too many times (same for next two)
kerr.CoordinatorLoadInProgress,
kerr.NotCoordinator:
hasAbortableCommitErr = true
default:
commitErrs = append(commitErrs, fmt.Sprintf("topic %s partition %d: %v", t.Topic, p.Partition, err))
}
}
}
Expand Down Expand Up @@ -316,7 +320,7 @@ func (s *GroupTransactSession) End(ctx context.Context, commit TransactionEndTry
s.failMu.Lock()
defer s.failMu.Unlock()

tryCommit := !s.failed() && commitErr == nil && !oldGeneration && okHeartbeat
tryCommit := !s.failed() && commitErr == nil && !hasAbortableCommitErr && okHeartbeat
willTryCommit := wantCommit && tryCommit

s.cl.cfg.logger.Log(LogLevelInfo, "transaction session ending",
Expand Down

0 comments on commit 8edf934

Please sign in to comment.