From 83b0a328548b1d76bd694fe1dbdd2752a21e33d1 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Mon, 21 Mar 2022 15:07:59 -0600 Subject: [PATCH] kgo: EndAndBeginTransaction w/ EndBeginTxnUnsafe: fix three issues - Depending on timing and logic races, EndTxn could encounter INVALID_TXN_STATE. We now ignore this error in the one place we expect to potentially encounter it, and have a long doc on why we ignore it. The issue is expected to be rare and has only been encountered by forcing pathological request processing with sleeps. - We now ensure we set addedToTxn once promises are called. Previously, we would set before promises are called. If a user was using these promises to know if they needed to commit, they previously could have committed before everything was promised and never committed again. This would only affect a user on shutdown if they were not careful. - We now ensure a final commit will be issued if we ever readd partitions to a transaction. Previously, if we added partitions but stopped producing, we would not commit when ending again and the transaction would timeout and our txnal ID would be fenced. Now, we set that we have readded and ensure we commit. --- pkg/kgo/producer.go | 31 +++++++++++++++++++++++++++++++ pkg/kgo/sink.go | 13 ++++++++++++- pkg/kgo/txn.go | 34 ++++++++++++++++++++++++++++++++-- 3 files changed, 75 insertions(+), 3 deletions(-) diff --git a/pkg/kgo/producer.go b/pkg/kgo/producer.go index 9e90df00..5f0ea55d 100644 --- a/pkg/kgo/producer.go +++ b/pkg/kgo/producer.go @@ -60,6 +60,19 @@ type producer struct { txnMu sync.Mutex inTxn bool + + // If using EndBeginTxnUnsafe, and any partitions are actually produced + // to, we issue an AddPartitionsToTxn at the end to re-add them to a + // new transaction. We have to due to logic races: the broker may not + // have handled the produce requests yet, and we want to ensure a new + // transaction is started. + // + // If the user stops producing, we want to ensure that our restarted + // transaction is actually ended. Thus, we set readded whenever we have + // partitions we actually restart. We issue EndTxn and reset readded in + // EndAndBegin; if nothing more was produced to, we ensure we finish + // the started txn. + readded bool } // BufferedProduceRecords returns the number of records currently buffered for @@ -419,6 +432,21 @@ func (p *producer) finishPromises(b batchPromise) { cl := p.cl var more bool start: + // If we are transactional, we want to set that records were added to a + // transaction after we finish a batch's promises. This is only + // encessary when using EndBeginTxnUnsafe: if we did not set after the + // promises, it is possible that the records finish but the client does + // not think another EndTxn needs to be issued. + // + // To keep our batchPromise struct to 64 bytes (better copying), we + // stuff the atomic bool into the error. We only need to mark things + // added if the batch did not error. + var addedToTxn *atomicBool + if sa := (stuffedAtomic{}); errors.As(b.err, &sa) { + b.err = nil + addedToTxn = sa.b + } + p.promisesMu.Lock() for i, pr := range b.recs { pr.Offset = b.baseOffset + int64(i) @@ -429,6 +457,9 @@ start: cl.finishRecordPromise(pr, b.err) b.recs[i] = promisedRec{} } + if addedToTxn != nil { + addedToTxn.set(true) + } p.promisesMu.Unlock() if cap(b.recs) > 4 { cl.prsPool.put(b.recs) diff --git a/pkg/kgo/sink.go b/pkg/kgo/sink.go index 03aee1d7..bdfb8f4f 100644 --- a/pkg/kgo/sink.go +++ b/pkg/kgo/sink.go @@ -867,6 +867,13 @@ func (cl *Client) finishBatch(batch *recBatch, producerID int64, producerEpoch i batch.records = nil batch.mu.Unlock() + // See comment in finishPromises: we have no error, so if we are + // transactional, we stuff our addedToTxn bool into a fake error. + var sa error + if cl.cfg.txnID != nil { + sa = stuffedAtomic{&batch.owner.addedToTxn} + } + cl.producer.promiseBatch(batchPromise{ baseOffset: baseOffset, pid: producerID, @@ -879,10 +886,14 @@ func (cl *Client) finishBatch(batch *recBatch, producerID int64, producerEpoch i attrs: RecordAttrs{uint8(attrs)}, partition: partition, recs: records, - err: err, + err: sa, }) } +type stuffedAtomic struct{ b *atomicBool } + +func (stuffedAtomic) Error() string { panic("unreachable") } + // handleRetryBatches sets any first-buf-batch to failing and triggers a // metadata that will eventually clear the failing state and re-drain. // diff --git a/pkg/kgo/txn.go b/pkg/kgo/txn.go index 0723b7db..93a4bba3 100644 --- a/pkg/kgo/txn.go +++ b/pkg/kgo/txn.go @@ -567,10 +567,11 @@ func (cl *Client) EndAndBeginTransaction( } } } + anyAdded = anyAdded || cl.producer.readded // EndTxn when no txn was started returns INVALID_TXN_STATE. if !anyAdded { - cl.cfg.logger.Log(LogLevelInfo, "no records were produced during the commit; thus no transaction was began; ending without doing anything") + cl.cfg.logger.Log(LogLevelDebug, "no records were produced during the commit; thus no transaction was began; ending without doing anything") return nil } @@ -590,6 +591,7 @@ func (cl *Client) EndAndBeginTransaction( "epoch", epoch, "commit", commit, ) + cl.producer.readded = false err = cl.doWithConcurrentTransactions("EndTxn", func() error { req := kmsg.NewPtrEndTxnRequest() req.TransactionalID = *cl.cfg.txnID @@ -600,6 +602,33 @@ func (cl *Client) EndAndBeginTransaction( if err != nil { return err } + + // When ending a transaction, if the user is using unsafe mode, + // there is a logic race where the user can actually end before + // AddPartitionsToTxn is issued. This should be rare and is + // most likely only to happen whenever a new transaction is + // starting from a not-in-transaction state (i.e., the first + // transaction). If we see InvalidTxnState in unsafe mode, we + // assume that a transaction was not actually begun and we + // return success. + // + // In Kafka, InvalidTxnState is also returned when producing + // non-transactional records from a producer that is currently + // in a transaction. + // + // All other cases it is returned is in EndTxn: + // * state == CompleteCommit and EndTxn != commit + // * state == CompleteAbort and EndTxn != abort + // * state == PrepareCommit and EndTxn != commit (otherwise, returns concurrent transactions) + // * state == PrepareAbort and EndTxn != abort (otherwise, returns concurrent transactions) + // * state == Empty + // + // This basically guards against the final case, all others are + // Kafka internal state transitioning and we should never hit + // them. + if how == EndBeginTxnUnsafe && resp.ErrorCode == kerr.InvalidTxnState.Code { + return nil + } return kerr.ErrorForCode(resp.ErrorCode) }) var ke *kerr.Error @@ -670,6 +699,7 @@ func (cl *Client) EndAndBeginTransaction( } } } + cl.producer.readded = true return nil }) } @@ -773,7 +803,7 @@ func (cl *Client) EndTransaction(ctx context.Context, commit TransactionEndTry) // Note that anyAdded is true if the producer ID was failed, meaning we will // get to the potential recovery logic below if necessary. if !anyAdded { - cl.cfg.logger.Log(LogLevelInfo, "no records were produced during the commit; thus no transaction was began; ending without doing anything") + cl.cfg.logger.Log(LogLevelDebug, "no records were produced during the commit; thus no transaction was began; ending without doing anything") return nil }