Skip to content

Commit

Permalink
kgo: audit for KIP-360 correctness
Browse files Browse the repository at this point in the history
- in producer err, if if txnID != nil, then we always need to fail,
  not just if we see kip-360 conditions
- handles UnknownProducerIDMapping now
- bumps epoch locally for the idempotent producer
- allows failed requests that the broker did not reply to to retry
  by returning that the request failed entirely
- keeps the last id / epoch on failure
  - this does nothing; does not hurt though
- expands documentation

This is in prep for KIP-588
  • Loading branch information
twmb committed Sep 7, 2020
1 parent 2eb27c6 commit 60de0da
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 15 deletions.
34 changes: 26 additions & 8 deletions pkg/kgo/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kgo
import (
"context"
"errors"
"math"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -229,8 +230,22 @@ func (cl *Client) producerID() (int64, int16, error) {
defer cl.producer.idMu.Unlock()

if id = cl.producer.id.Load().(*producerID); id.err == errReloadProducerID {
id = cl.doInitProducerID(id.id, id.epoch)
cl.producer.id.Store(id)
// For the idempotent producer, as specified in KIP-360,
// if we had an ID, we can bump the epoch locally.
// If we are at the max epoch, we will ask for a new ID.
if cl.cfg.txnID == nil && id.id >= 0 && id.epoch < math.MaxInt16-1 {
cl.producer.id.Store(&producerID{
id: id.id,
epoch: id.epoch + 1,
err: nil,
})
} else {
newID, keep := cl.doInitProducerID(id.id, id.epoch)
if keep {
id = newID
cl.producer.id.Store(id)
}
}
}
}

Expand Down Expand Up @@ -270,7 +285,10 @@ func (cl *Client) failProducerID(id int64, epoch int16, err error) {

// doInitProducerID inits the idempotent ID and potentially the transactional
// producer epoch.
func (cl *Client) doInitProducerID(lastID int64, lastEpoch int16) *producerID {
//
// This returns false only if our request failed and not due to the key being
// unknown to the broker.
func (cl *Client) doInitProducerID(lastID int64, lastEpoch int16) (*producerID, bool) {
cl.cfg.logger.Log(LogLevelInfo, "initializing producer id")
req := &kmsg.InitProducerIDRequest{
TransactionalID: cl.cfg.txnID,
Expand All @@ -290,15 +308,15 @@ func (cl *Client) doInitProducerID(lastID int64, lastEpoch int16) *producerID {
// what we hit first is the default.
if err == ErrUnknownRequestKey {
cl.cfg.logger.Log(LogLevelInfo, "unable to initialize a producer id because the broker is too old, continuing without a producer id")
return &producerID{-1, -1, nil}
return &producerID{-1, -1, nil}, true
}
cl.cfg.logger.Log(LogLevelInfo, "producer id initialization failure", "err", err)
return &producerID{-1, -1, err}
cl.cfg.logger.Log(LogLevelInfo, "producer id initialization failure, discarding initialization attempt", "err", err)
return &producerID{lastID, lastEpoch, err}, false
}
resp := kresp.(*kmsg.InitProducerIDResponse)
if err = kerr.ErrorForCode(resp.ErrorCode); err != nil {
cl.cfg.logger.Log(LogLevelInfo, "producer id initialization errored", "err", err)
return &producerID{-1, -1, err}
return &producerID{lastID, lastEpoch, err}, true
}
cl.cfg.logger.Log(LogLevelInfo, "producer id initialization success", "id", resp.ProducerID, "epoch", resp.ProducerEpoch)

Expand All @@ -309,7 +327,7 @@ func (cl *Client) doInitProducerID(lastID int64, lastEpoch int16) *producerID {
cl.producer.idVersion = req.Version
}

return &producerID{resp.ProducerID, resp.ProducerEpoch, nil}
return &producerID{resp.ProducerID, resp.ProducerEpoch, nil}, true
}

// partitionsForTopicProduce returns the topic partitions for a record.
Expand Down
37 changes: 31 additions & 6 deletions pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,25 +567,35 @@ func (s *sink) handleReqResp(req *produceRequest, resp kmsg.Response, err error)
reqRetry.addSeqBatch(topic, partition, batch)

case err == kerr.OutOfOrderSequenceNumber,
err == kerr.UnknownProducerID:
err == kerr.UnknownProducerID,
err == kerr.InvalidProducerIDMapping:
// OOOSN always means data loss 1.0.0+ and is ambiguous prior.
// We assume the worst and only continue if requested.
//
// UnknownProducerID was introduced to allow some form of safe
// handling, but KIP-360 demonstrated that resetting sequence
// numbers is fundamentally unsafe, so we treat it like OOOSN.
//
// InvalidMapping is similar to UnknownProducerID, but occurs
// when the txnal coordinator timed out our transaction.
//
// 2.5.0
// =====
// 2.5.0 introduced some behavior to potentially safely reset
// the sequence numbers by bumping an epoch (see KIP-360).
//
// For the idempotent producer, the solution is to fail all
// buffered records and then let the client user reset things
// with the understanding that they cannot guard against
// potential dups / reordering at that point. Realistically,
// that's no better than just a config knob that allows
// the user to continue (our stopOnDataLoss flag), so
// we do not try any logic in the idempotent case.
if s.cl.cfg.stopOnDataLoss || err == kerr.UnknownProducerID && s.cl.producer.idVersion >= 3 && s.cl.cfg.txnID != nil {
s.cl.cfg.logger.Log(LogLevelInfo, "batch errored with OutOfOrderSequenceNumber or UnknownProducerID, failing the producer ID",
// that's no better than a config knob that allows the user
// to continue (our stopOnDataLoss flag), so for the idempotent
// producer, if stopOnDataLoss is false, we just continue.
//
// For the transactional producer, we always fail the producerID.
// EndTransaction will trigger recovery if possible.
if s.cl.cfg.txnID != nil || s.cl.cfg.stopOnDataLoss {
s.cl.cfg.logger.Log(LogLevelInfo, "batch errored, failing the producer ID",
"topic", topic,
"partition", partition,
"producer_id", req.producerID,
Expand All @@ -600,6 +610,8 @@ func (s *sink) handleReqResp(req *produceRequest, resp kmsg.Response, err error)
s.cl.cfg.onDataLoss(topic, partition)
}

// For OOOSN,
//
// We could be here because we do not have unlimited
// retries and previously failed a retriable error.
// The broker could technically have been fine, but we
Expand All @@ -612,6 +624,19 @@ func (s *sink) handleReqResp(req *produceRequest, resp kmsg.Response, err error)
// broker that thinks we are still at a high seq when
// we are sending 0. If we did not fail, then we would
// loop with an OOOSN error.
//
// For UnknownProducerID,
//
// We could be here because we have an idempotent producer.
// If we were transactional, we would have failed above.
// We could just reset sequence numbers, but we may as well
// also fail the producer ID which triggers epoch bumping
// and simplifies logic for the OOSN thing described above.
//
// For InvalidProducerIDMapping,
//
// We should not be here, since this error occurs in the
// context of transactions, which would be caught above.
s.cl.cfg.logger.Log(LogLevelInfo, "batch errored with OutOfOrderSequenceNumber or UnknownProducerID, failing the producer ID and resetting the partition sequence number",
"topic", topic,
"partition", partition,
Expand Down
4 changes: 3 additions & 1 deletion pkg/kgo/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,9 @@ func (cl *Client) EndTransaction(ctx context.Context, commit TransactionEndTry)

switch err.(type) {
case *kerr.Error:
if err != kerr.UnknownProducerID || cl.producer.idVersion <= 2 {
kip360 := cl.producer.idVersion >= 3 &&
(err == kerr.UnknownProducerID || err == kerr.InvalidProducerIDMapping)
if !kip360 {
return err // fatal, unrecoverable
}

Expand Down

0 comments on commit 60de0da

Please sign in to comment.