From 939cba2da78da04db14e38cddbc939f0ed85dcc3 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Tue, 11 May 2021 18:20:41 -0600 Subject: [PATCH] txns: make even safer (& drop default txn timeout to 40s) See embedded comments; by ensuring our txn timeout is less than the session timeout and the rebalance timeout, then if we force a heartbeat before we send EndTxn, we can be prettyyyyy sure that our transaction will either complete successfully before the next heartbeat, or the transaction will time out. This helps avoid the KIP-447 situation for pre-2.5.0 clusters. --- pkg/kgo/config.go | 12 +++-- pkg/kgo/consumer_group.go | 33 ++++++++++-- pkg/kgo/txn.go | 108 ++++++++++++++++++++++++++++++-------- 3 files changed, 122 insertions(+), 31 deletions(-) diff --git a/pkg/kgo/config.go b/pkg/kgo/config.go index 27830562..ffd879fb 100644 --- a/pkg/kgo/config.go +++ b/pkg/kgo/config.go @@ -302,7 +302,7 @@ func defaultCfg() cfg { metadataMaxAge: 5 * time.Minute, metadataMinAge: 10 * time.Second, - txnTimeout: 60 * time.Second, + txnTimeout: 40 * time.Second, acks: AllISRAcks(), compression: []CompressionCodec{SnappyCompression(), NoCompression()}, maxRecordBatchBytes: 1000000, // Kafka max.message.bytes default is 1000012 @@ -800,10 +800,12 @@ func TransactionalID(id string) ProducerOpt { } // TransactionTimeout sets the allowed for a transaction, overriding the -// default 60s. It may be a good idea to set this under the rebalance timeout -// for a group, so that a produce will not complete successfully after the -// consumer group has already moved the partitions the consumer/producer is -// working on from one group member to another. +// default 40s. It is a good idea to keep this less than a group's session +// timeout, so that a group member will always be alive for the duration of a +// transaction even if connectivity dies. This helps prevent a transaction +// finishing after a rebalance, which is problematic pre-Kafka 2.5.0. If you +// are on Kafka 2.5.0+, then you can use the RequireStableFetchOffsets option +// when assigning the group, and you can set this to whatever you would like. // // Transaction timeouts begin when the first record is produced within a // transaction, not when a transaction begins. diff --git a/pkg/kgo/consumer_group.go b/pkg/kgo/consumer_group.go index 1573894c..d61770a6 100644 --- a/pkg/kgo/consumer_group.go +++ b/pkg/kgo/consumer_group.go @@ -62,8 +62,16 @@ func Balancers(balancers ...GroupBalancer) GroupOpt { // timeout, the broker will remove the member from the group and initiate a // rebalance. // -// This corresponds to Kafka's session.timeout.ms setting and must be within -// the broker's group.min.session.timeout.ms and group.max.session.timeout.ms. +// If you are using a GroupTransactSession for EOS, wish to lower this, and are +// talking to a Kafka cluster pre 2.5.0, consider lowering the +// TransactionTimeout. If you do not, you risk a transaction finishing after a +// group has rebalanced, which could lead to duplicate processing. If you are +// talking to a Kafka 2.5.0+ cluster, you can safely use the +// RequireStableFetchOffsets group option and prevent any problems. +// +// This option corresponds to Kafka's session.timeout.ms setting and must be +// within the broker's group.min.session.timeout.ms and +// group.max.session.timeout.ms. func SessionTimeout(timeout time.Duration) GroupOpt { return groupOpt{func(cfg *groupConsumer) { cfg.sessionTimeout = timeout }} } @@ -283,6 +291,14 @@ type groupConsumer struct { rejoinCh chan struct{} // cap 1; sent to if subscription changes (regex) + // For EOS, before we commit, we force a heartbeat. If the client and + // group member are both configured properly, then the transactional + // timeout will be less than the session timeout. By forcing a + // heartbeat before the commit, if the heartbeat was successful, then + // we ensure that we will complete the transaction within the group + // session, meaning we will not commit after the group has rebalanced. + heartbeatForceCh chan func(error) + // The following two are only updated in the manager / join&sync loop lastAssigned map[string][]int32 // only updated in join&sync loop nowAssigned map[string][]int32 // only updated in join&sync loop @@ -402,9 +418,10 @@ func (cl *Client) AssignGroup(group string, opts ...GroupOpt) { tps: newTopicsPartitions(), - using: make(map[string]int), - rejoinCh: make(chan struct{}, 1), - reSeen: make(map[string]struct{}), + using: make(map[string]int), + rejoinCh: make(chan struct{}, 1), + heartbeatForceCh: make(chan func(error)), + reSeen: make(map[string]struct{}), sessionTimeout: 45000 * time.Millisecond, rebalanceTimeout: 60000 * time.Millisecond, @@ -919,12 +936,15 @@ func (g *groupConsumer) heartbeat(fetchErrCh <-chan error, s *assignRevokeSessio for { var err error + var force func(error) heartbeat = false select { case <-cooperativeFastCheck: heartbeat = true case <-ticker.C: heartbeat = true + case force = <-g.heartbeatForceCh: + heartbeat = true case <-g.rejoinCh: // If a metadata update changes our subscription, // we just pretend we are rebalancing. @@ -959,6 +979,9 @@ func (g *groupConsumer) heartbeat(fetchErrCh <-chan error, s *assignRevokeSessio err = kerr.ErrorForCode(resp.ErrorCode) } g.cl.cfg.logger.Log(LogLevelDebug, "heartbeat complete", "err", err) + if force != nil { + force(err) + } } // The first error either triggers a clean revoke and metadata diff --git a/pkg/kgo/txn.go b/pkg/kgo/txn.go index 936367fa..f19c4c6c 100644 --- a/pkg/kgo/txn.go +++ b/pkg/kgo/txn.go @@ -32,8 +32,12 @@ type GroupTransactSession struct { cooperative bool - revokeMu sync.Mutex - revoked bool + failMu sync.Mutex + + revoked bool + revokedCh chan struct{} // closed once when revoked is set; reset after End + lost bool + lostCh chan struct{} // closed once when lost is set; reset after End } // AssignGroupTransactSession is exactly the same as AssignGroup, but wraps the @@ -76,13 +80,18 @@ func (cl *Client) AssignGroupTransactSession(group string, opts ...GroupOpt) *Gr userRevoked := g.onRevoked g.onRevoked = func(ctx context.Context, rev map[string][]int32) { - s.revokeMu.Lock() - defer s.revokeMu.Unlock() + s.failMu.Lock() + defer s.failMu.Unlock() + if s.revoked { + return + } + if s.cooperative && len(rev) == 0 && !s.revoked { cl.cfg.logger.Log(LogLevelInfo, "transact session in on_revoke with nothing to revoke; allowing next commit") } else { cl.cfg.logger.Log(LogLevelInfo, "transact session in on_revoke; aborting next commit if we are currently in a transaction") s.revoked = true + close(s.revokedCh) } if userRevoked != nil { @@ -90,6 +99,25 @@ func (cl *Client) AssignGroupTransactSession(group string, opts ...GroupOpt) *Gr } } + userLost := g.onLost + g.onLost = func(ctx context.Context, lost map[string][]int32) { + s.failMu.Lock() + defer s.failMu.Unlock() + if s.lost { + return + } + + cl.cfg.logger.Log(LogLevelInfo, "transact session in on_lost; aborting next commit if we are currently in a transaction") + s.lost = true + close(s.lostCh) + + if userLost != nil { + userLost(ctx, lost) + } else if userRevoked != nil { + userRevoked(ctx, lost) + } + } + return s } @@ -139,6 +167,10 @@ func (s *GroupTransactSession) Begin() error { return s.cl.BeginTransaction() } +func (s *GroupTransactSession) failed() bool { + return s.revoked || s.lost +} + // End ends a transaction, committing if commit is true, if the group did not // rebalance since the transaction began, and if committing offsets is // successful. If commit is false, the group has rebalanced, or any partition @@ -156,9 +188,12 @@ func (s *GroupTransactSession) Begin() error { // recommended to not cancel it. func (s *GroupTransactSession) End(ctx context.Context, commit TransactionEndTry) (bool, error) { defer func() { - s.revokeMu.Lock() + s.failMu.Lock() s.revoked = false - s.revokeMu.Unlock() + s.revokedCh = make(chan struct{}) + s.lost = false + s.lostCh = make(chan struct{}) + s.failMu.Unlock() }() switch commit { @@ -174,20 +209,22 @@ func (s *GroupTransactSession) End(ctx context.Context, commit TransactionEndTry wantCommit := bool(commit) - s.revokeMu.Lock() - revoked := s.revoked + s.failMu.Lock() + failed := s.failed() precommit := s.cl.CommittedOffsets() postcommit := s.cl.UncommittedOffsets() - s.revokeMu.Unlock() + s.failMu.Unlock() var oldGeneration bool var commitErr error - if wantCommit && !revoked { + var g *groupConsumer + + if wantCommit && !failed { var commitErrs []string committed := make(chan struct{}) - s.cl.commitTransactionOffsets(context.Background(), postcommit, + g = s.cl.commitTransactionOffsets(context.Background(), postcommit, func(_ *kmsg.TxnOffsetCommitRequest, resp *kmsg.TxnOffsetCommitResponse, err error) { defer close(committed) if err != nil { @@ -215,14 +252,42 @@ func (s *GroupTransactSession) End(ctx context.Context, commit TransactionEndTry } } - s.revokeMu.Lock() - defer s.revokeMu.Unlock() + // Now that we have committed our offsets, before we allow them to be + // used, we force a heartbeat. By forcing a heartbeat, if there is no + // error, then we know we have up to RebalanceTimeout to write our + // EndTxnRequest without a problem. + // + // We should not be booted from the group if we receive an ok + // heartbeat, meaning that, as mentioned, we should be able to end the + // transaction safely. + var okHeartbeat bool + if g != nil && commitErr == nil { + waitHeartbeat := make(chan struct{}) + var heartbeatErr error + select { + case g.heartbeatForceCh <- func(err error) { + defer close(waitHeartbeat) + heartbeatErr = err + }: + select { + case <-waitHeartbeat: + okHeartbeat = heartbeatErr == nil + case <-s.revokedCh: + case <-s.lostCh: + } + case <-s.revokedCh: + case <-s.lostCh: + } + } + + s.failMu.Lock() + defer s.failMu.Unlock() - tryCommit := !s.revoked && commitErr == nil && !oldGeneration + tryCommit := !s.failed() && commitErr == nil && !oldGeneration && okHeartbeat willTryCommit := wantCommit && tryCommit s.cl.cfg.logger.Log(LogLevelInfo, "transaction session ending", - "was_revoked", s.revoked, + "was_failed", s.failed(), "want_commit", wantCommit, "can_try_commit", tryCommit, "will_try_commit", willTryCommit, @@ -547,13 +612,13 @@ func (cl *Client) commitTransactionOffsets( ctx context.Context, uncommitted map[string]map[int32]EpochOffset, onDone func(*kmsg.TxnOffsetCommitRequest, *kmsg.TxnOffsetCommitResponse, error), -) { +) *groupConsumer { cl.cfg.logger.Log(LogLevelDebug, "in commitTransactionOffsets", "with", uncommitted) defer cl.cfg.logger.Log(LogLevelDebug, "left commitTransactionOffsets") if cl.cfg.txnID == nil { onDone(nil, nil, errNotTransactional) - return + return nil } // Before committing, ensure we are at least in a transaction. We @@ -563,18 +628,18 @@ func (cl *Client) commitTransactionOffsets( if !cl.producer.inTxn { onDone(nil, nil, errNotInTransaction) cl.producer.txnMu.Unlock() - return + return nil } cl.producer.txnMu.Unlock() g, ok := cl.consumer.loadGroup() if !ok { onDone(new(kmsg.TxnOffsetCommitRequest), new(kmsg.TxnOffsetCommitResponse), errNotGroup) - return + return nil } if len(uncommitted) == 0 { onDone(new(kmsg.TxnOffsetCommitRequest), new(kmsg.TxnOffsetCommitResponse), nil) - return + return g } g.mu.Lock() @@ -585,12 +650,13 @@ func (cl *Client) commitTransactionOffsets( if onDone != nil { onDone(nil, nil, err) } - return + return g } g.offsetsAddedToTxn = true } g.commitTxn(ctx, uncommitted, onDone) + return g } // Ties a transactional producer to a group. Since this requires a producer ID,