Skip to content

Commit

Permalink
kgo: EndAndBeginTransaction w/ EndBeginTxnUnsafe: fix three issues
Browse files Browse the repository at this point in the history
- Depending on timing and logic races, EndTxn could encounter
INVALID_TXN_STATE. We now ignore this error in the one place we expect
to potentially encounter it, and have a long doc on why we ignore it.
The issue is expected to be rare and has only been encountered by
forcing pathological request processing with sleeps.

- We now ensure we set addedToTxn once promises are called. Previously,
we would set before promises are called. If a user was using these
promises to know if they needed to commit, they previously could have
committed before everything was promised and never committed again. This
would only affect a user on shutdown if they were not careful.

- We now ensure a final commit will be issued if we ever readd
partitions to a transaction. Previously, if we added partitions but
stopped producing, we would not commit when ending again and the
transaction would timeout and our txnal ID would be fenced. Now, we set
that we have readded and ensure we commit.
  • Loading branch information
twmb committed Mar 21, 2022
1 parent bd1d43d commit 83b0a32
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 3 deletions.
31 changes: 31 additions & 0 deletions pkg/kgo/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,19 @@ type producer struct {

txnMu sync.Mutex
inTxn bool

// If using EndBeginTxnUnsafe, and any partitions are actually produced
// to, we issue an AddPartitionsToTxn at the end to re-add them to a
// new transaction. We have to due to logic races: the broker may not
// have handled the produce requests yet, and we want to ensure a new
// transaction is started.
//
// If the user stops producing, we want to ensure that our restarted
// transaction is actually ended. Thus, we set readded whenever we have
// partitions we actually restart. We issue EndTxn and reset readded in
// EndAndBegin; if nothing more was produced to, we ensure we finish
// the started txn.
readded bool
}

// BufferedProduceRecords returns the number of records currently buffered for
Expand Down Expand Up @@ -419,6 +432,21 @@ func (p *producer) finishPromises(b batchPromise) {
cl := p.cl
var more bool
start:
// If we are transactional, we want to set that records were added to a
// transaction after we finish a batch's promises. This is only
// encessary when using EndBeginTxnUnsafe: if we did not set after the
// promises, it is possible that the records finish but the client does
// not think another EndTxn needs to be issued.
//
// To keep our batchPromise struct to 64 bytes (better copying), we
// stuff the atomic bool into the error. We only need to mark things
// added if the batch did not error.
var addedToTxn *atomicBool
if sa := (stuffedAtomic{}); errors.As(b.err, &sa) {
b.err = nil
addedToTxn = sa.b
}

p.promisesMu.Lock()
for i, pr := range b.recs {
pr.Offset = b.baseOffset + int64(i)
Expand All @@ -429,6 +457,9 @@ start:
cl.finishRecordPromise(pr, b.err)
b.recs[i] = promisedRec{}
}
if addedToTxn != nil {
addedToTxn.set(true)
}
p.promisesMu.Unlock()
if cap(b.recs) > 4 {
cl.prsPool.put(b.recs)
Expand Down
13 changes: 12 additions & 1 deletion pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -867,6 +867,13 @@ func (cl *Client) finishBatch(batch *recBatch, producerID int64, producerEpoch i
batch.records = nil
batch.mu.Unlock()

// See comment in finishPromises: we have no error, so if we are
// transactional, we stuff our addedToTxn bool into a fake error.
var sa error
if cl.cfg.txnID != nil {
sa = stuffedAtomic{&batch.owner.addedToTxn}
}

cl.producer.promiseBatch(batchPromise{
baseOffset: baseOffset,
pid: producerID,
Expand All @@ -879,10 +886,14 @@ func (cl *Client) finishBatch(batch *recBatch, producerID int64, producerEpoch i
attrs: RecordAttrs{uint8(attrs)},
partition: partition,
recs: records,
err: err,
err: sa,
})
}

type stuffedAtomic struct{ b *atomicBool }

func (stuffedAtomic) Error() string { panic("unreachable") }

// handleRetryBatches sets any first-buf-batch to failing and triggers a
// metadata that will eventually clear the failing state and re-drain.
//
Expand Down
34 changes: 32 additions & 2 deletions pkg/kgo/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,10 +567,11 @@ func (cl *Client) EndAndBeginTransaction(
}
}
}
anyAdded = anyAdded || cl.producer.readded

// EndTxn when no txn was started returns INVALID_TXN_STATE.
if !anyAdded {
cl.cfg.logger.Log(LogLevelInfo, "no records were produced during the commit; thus no transaction was began; ending without doing anything")
cl.cfg.logger.Log(LogLevelDebug, "no records were produced during the commit; thus no transaction was began; ending without doing anything")
return nil
}

Expand All @@ -590,6 +591,7 @@ func (cl *Client) EndAndBeginTransaction(
"epoch", epoch,
"commit", commit,
)
cl.producer.readded = false
err = cl.doWithConcurrentTransactions("EndTxn", func() error {
req := kmsg.NewPtrEndTxnRequest()
req.TransactionalID = *cl.cfg.txnID
Expand All @@ -600,6 +602,33 @@ func (cl *Client) EndAndBeginTransaction(
if err != nil {
return err
}

// When ending a transaction, if the user is using unsafe mode,
// there is a logic race where the user can actually end before
// AddPartitionsToTxn is issued. This should be rare and is
// most likely only to happen whenever a new transaction is
// starting from a not-in-transaction state (i.e., the first
// transaction). If we see InvalidTxnState in unsafe mode, we
// assume that a transaction was not actually begun and we
// return success.
//
// In Kafka, InvalidTxnState is also returned when producing
// non-transactional records from a producer that is currently
// in a transaction.
//
// All other cases it is returned is in EndTxn:
// * state == CompleteCommit and EndTxn != commit
// * state == CompleteAbort and EndTxn != abort
// * state == PrepareCommit and EndTxn != commit (otherwise, returns concurrent transactions)
// * state == PrepareAbort and EndTxn != abort (otherwise, returns concurrent transactions)
// * state == Empty
//
// This basically guards against the final case, all others are
// Kafka internal state transitioning and we should never hit
// them.
if how == EndBeginTxnUnsafe && resp.ErrorCode == kerr.InvalidTxnState.Code {
return nil
}
return kerr.ErrorForCode(resp.ErrorCode)
})
var ke *kerr.Error
Expand Down Expand Up @@ -670,6 +699,7 @@ func (cl *Client) EndAndBeginTransaction(
}
}
}
cl.producer.readded = true
return nil
})
}
Expand Down Expand Up @@ -773,7 +803,7 @@ func (cl *Client) EndTransaction(ctx context.Context, commit TransactionEndTry)
// Note that anyAdded is true if the producer ID was failed, meaning we will
// get to the potential recovery logic below if necessary.
if !anyAdded {
cl.cfg.logger.Log(LogLevelInfo, "no records were produced during the commit; thus no transaction was began; ending without doing anything")
cl.cfg.logger.Log(LogLevelDebug, "no records were produced during the commit; thus no transaction was began; ending without doing anything")
return nil
}

Expand Down

0 comments on commit 83b0a32

Please sign in to comment.