From 60de0dafbeb7bf72996ad5f24690820962d2f584 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Mon, 7 Sep 2020 14:21:59 -0600 Subject: [PATCH] kgo: audit for KIP-360 correctness - in producer err, if if txnID != nil, then we always need to fail, not just if we see kip-360 conditions - handles UnknownProducerIDMapping now - bumps epoch locally for the idempotent producer - allows failed requests that the broker did not reply to to retry by returning that the request failed entirely - keeps the last id / epoch on failure - this does nothing; does not hurt though - expands documentation This is in prep for KIP-588 --- pkg/kgo/producer.go | 34 ++++++++++++++++++++++++++-------- pkg/kgo/sink.go | 37 +++++++++++++++++++++++++++++++------ pkg/kgo/txn.go | 4 +++- 3 files changed, 60 insertions(+), 15 deletions(-) diff --git a/pkg/kgo/producer.go b/pkg/kgo/producer.go index c14e4c16..9d0521ef 100644 --- a/pkg/kgo/producer.go +++ b/pkg/kgo/producer.go @@ -3,6 +3,7 @@ package kgo import ( "context" "errors" + "math" "sync" "sync/atomic" "time" @@ -229,8 +230,22 @@ func (cl *Client) producerID() (int64, int16, error) { defer cl.producer.idMu.Unlock() if id = cl.producer.id.Load().(*producerID); id.err == errReloadProducerID { - id = cl.doInitProducerID(id.id, id.epoch) - cl.producer.id.Store(id) + // For the idempotent producer, as specified in KIP-360, + // if we had an ID, we can bump the epoch locally. + // If we are at the max epoch, we will ask for a new ID. + if cl.cfg.txnID == nil && id.id >= 0 && id.epoch < math.MaxInt16-1 { + cl.producer.id.Store(&producerID{ + id: id.id, + epoch: id.epoch + 1, + err: nil, + }) + } else { + newID, keep := cl.doInitProducerID(id.id, id.epoch) + if keep { + id = newID + cl.producer.id.Store(id) + } + } } } @@ -270,7 +285,10 @@ func (cl *Client) failProducerID(id int64, epoch int16, err error) { // doInitProducerID inits the idempotent ID and potentially the transactional // producer epoch. -func (cl *Client) doInitProducerID(lastID int64, lastEpoch int16) *producerID { +// +// This returns false only if our request failed and not due to the key being +// unknown to the broker. +func (cl *Client) doInitProducerID(lastID int64, lastEpoch int16) (*producerID, bool) { cl.cfg.logger.Log(LogLevelInfo, "initializing producer id") req := &kmsg.InitProducerIDRequest{ TransactionalID: cl.cfg.txnID, @@ -290,15 +308,15 @@ func (cl *Client) doInitProducerID(lastID int64, lastEpoch int16) *producerID { // what we hit first is the default. if err == ErrUnknownRequestKey { cl.cfg.logger.Log(LogLevelInfo, "unable to initialize a producer id because the broker is too old, continuing without a producer id") - return &producerID{-1, -1, nil} + return &producerID{-1, -1, nil}, true } - cl.cfg.logger.Log(LogLevelInfo, "producer id initialization failure", "err", err) - return &producerID{-1, -1, err} + cl.cfg.logger.Log(LogLevelInfo, "producer id initialization failure, discarding initialization attempt", "err", err) + return &producerID{lastID, lastEpoch, err}, false } resp := kresp.(*kmsg.InitProducerIDResponse) if err = kerr.ErrorForCode(resp.ErrorCode); err != nil { cl.cfg.logger.Log(LogLevelInfo, "producer id initialization errored", "err", err) - return &producerID{-1, -1, err} + return &producerID{lastID, lastEpoch, err}, true } cl.cfg.logger.Log(LogLevelInfo, "producer id initialization success", "id", resp.ProducerID, "epoch", resp.ProducerEpoch) @@ -309,7 +327,7 @@ func (cl *Client) doInitProducerID(lastID int64, lastEpoch int16) *producerID { cl.producer.idVersion = req.Version } - return &producerID{resp.ProducerID, resp.ProducerEpoch, nil} + return &producerID{resp.ProducerID, resp.ProducerEpoch, nil}, true } // partitionsForTopicProduce returns the topic partitions for a record. diff --git a/pkg/kgo/sink.go b/pkg/kgo/sink.go index 6909fd38..5a8d43a7 100644 --- a/pkg/kgo/sink.go +++ b/pkg/kgo/sink.go @@ -567,7 +567,8 @@ func (s *sink) handleReqResp(req *produceRequest, resp kmsg.Response, err error) reqRetry.addSeqBatch(topic, partition, batch) case err == kerr.OutOfOrderSequenceNumber, - err == kerr.UnknownProducerID: + err == kerr.UnknownProducerID, + err == kerr.InvalidProducerIDMapping: // OOOSN always means data loss 1.0.0+ and is ambiguous prior. // We assume the worst and only continue if requested. // @@ -575,17 +576,26 @@ func (s *sink) handleReqResp(req *produceRequest, resp kmsg.Response, err error) // handling, but KIP-360 demonstrated that resetting sequence // numbers is fundamentally unsafe, so we treat it like OOOSN. // + // InvalidMapping is similar to UnknownProducerID, but occurs + // when the txnal coordinator timed out our transaction. + // + // 2.5.0 + // ===== // 2.5.0 introduced some behavior to potentially safely reset // the sequence numbers by bumping an epoch (see KIP-360). + // // For the idempotent producer, the solution is to fail all // buffered records and then let the client user reset things // with the understanding that they cannot guard against // potential dups / reordering at that point. Realistically, - // that's no better than just a config knob that allows - // the user to continue (our stopOnDataLoss flag), so - // we do not try any logic in the idempotent case. - if s.cl.cfg.stopOnDataLoss || err == kerr.UnknownProducerID && s.cl.producer.idVersion >= 3 && s.cl.cfg.txnID != nil { - s.cl.cfg.logger.Log(LogLevelInfo, "batch errored with OutOfOrderSequenceNumber or UnknownProducerID, failing the producer ID", + // that's no better than a config knob that allows the user + // to continue (our stopOnDataLoss flag), so for the idempotent + // producer, if stopOnDataLoss is false, we just continue. + // + // For the transactional producer, we always fail the producerID. + // EndTransaction will trigger recovery if possible. + if s.cl.cfg.txnID != nil || s.cl.cfg.stopOnDataLoss { + s.cl.cfg.logger.Log(LogLevelInfo, "batch errored, failing the producer ID", "topic", topic, "partition", partition, "producer_id", req.producerID, @@ -600,6 +610,8 @@ func (s *sink) handleReqResp(req *produceRequest, resp kmsg.Response, err error) s.cl.cfg.onDataLoss(topic, partition) } + // For OOOSN, + // // We could be here because we do not have unlimited // retries and previously failed a retriable error. // The broker could technically have been fine, but we @@ -612,6 +624,19 @@ func (s *sink) handleReqResp(req *produceRequest, resp kmsg.Response, err error) // broker that thinks we are still at a high seq when // we are sending 0. If we did not fail, then we would // loop with an OOOSN error. + // + // For UnknownProducerID, + // + // We could be here because we have an idempotent producer. + // If we were transactional, we would have failed above. + // We could just reset sequence numbers, but we may as well + // also fail the producer ID which triggers epoch bumping + // and simplifies logic for the OOSN thing described above. + // + // For InvalidProducerIDMapping, + // + // We should not be here, since this error occurs in the + // context of transactions, which would be caught above. s.cl.cfg.logger.Log(LogLevelInfo, "batch errored with OutOfOrderSequenceNumber or UnknownProducerID, failing the producer ID and resetting the partition sequence number", "topic", topic, "partition", partition, diff --git a/pkg/kgo/txn.go b/pkg/kgo/txn.go index a44b45fb..8bf5aee0 100644 --- a/pkg/kgo/txn.go +++ b/pkg/kgo/txn.go @@ -360,7 +360,9 @@ func (cl *Client) EndTransaction(ctx context.Context, commit TransactionEndTry) switch err.(type) { case *kerr.Error: - if err != kerr.UnknownProducerID || cl.producer.idVersion <= 2 { + kip360 := cl.producer.idVersion >= 3 && + (err == kerr.UnknownProducerID || err == kerr.InvalidProducerIDMapping) + if !kip360 { return err // fatal, unrecoverable }