From 9ada82d24af761826f9b3359e02b0bc9cfaeb45b Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Mon, 3 May 2021 20:08:55 -0600 Subject: [PATCH] sink: create producerID *BEFORE* produce request (partial revert of dc44d10b) dc44d10b was done because a metadata update would trigger a sink to drain, and that would ALWAYS create a producer ID, and would then see there was nothing to produce. So, we were creating producer IDs even for pure consumers. Unfortunately, switching the order meant that the errReloadProducerID / needSeqReset combo was flawed. In fact, I even noted that the order needed to be producerID and then createReq in the bottom of the docs just above the producerID call. We now re-move the producerID call above createReq, but we preserve the intent of dc44d10b by checking if any records are buffered before loading the producer ID. This also adds an important doc comment with a big NOTE just above createReq to prevent this order flipping from occurring again, as well as makes the prior doc comment just above producerID more prominent. --- pkg/kgo/sink.go | 45 ++++++++++++++++++++++++++++++--------------- 1 file changed, 30 insertions(+), 15 deletions(-) diff --git a/pkg/kgo/sink.go b/pkg/kgo/sink.go index fe6beb3f..848f07db 100644 --- a/pkg/kgo/sink.go +++ b/pkg/kgo/sink.go @@ -76,14 +76,16 @@ func (cl *Client) newSink(nodeID int32) *sink { // createReq returns a produceRequest from currently buffered records // and whether there are more records to create more requests immediately. -func (s *sink) createReq() (*produceRequest, *kmsg.AddPartitionsToTxnRequest, bool) { +func (s *sink) createReq(id int64, epoch int16) (*produceRequest, *kmsg.AddPartitionsToTxnRequest, bool) { req := &produceRequest{ txnID: s.cl.cfg.txnID, acks: s.cl.cfg.acks.val, timeout: int32(s.cl.cfg.produceTimeout.Milliseconds()), batches: make(seqRecBatches, 5), - idempotent: s.cl.idempotent(), + producerID: id, + producerEpoch: epoch, + idempotent: s.cl.idempotent(), compressor: s.cl.compressor, @@ -92,6 +94,8 @@ func (s *sink) createReq() (*produceRequest, *kmsg.AddPartitionsToTxnRequest, bo } txnBuilder := txnReqBuilder{ txnID: req.txnID, + id: id, + epoch: epoch, } var moreToDrain bool @@ -136,6 +140,8 @@ func (s *sink) createReq() (*produceRequest, *kmsg.AddPartitionsToTxnRequest, bo type txnReqBuilder struct { txnID *string req *kmsg.AddPartitionsToTxnRequest + id int64 + epoch int16 addedTopics map[string]int // topic => index into req } @@ -150,6 +156,8 @@ func (t *txnReqBuilder) add(rb *recBuf) { if t.req == nil { t.req = &kmsg.AddPartitionsToTxnRequest{ TransactionalID: *t.txnID, + ProducerID: t.id, + ProducerEpoch: t.epoch, } t.addedTopics = make(map[string]int, 10) } @@ -255,12 +263,11 @@ func (s *sink) produce(sem <-chan struct{}) bool { } }() - // We create the req before getting our producer ID. It is possible we - // were triggered to produce by a metadata update adding a recBuf to a - // sink, when in reality no records are actually on that recBuf. - req, txnReq, moreToDrain := s.createReq() - if len(req.batches) == 0 { // everything was failing or lingering - return moreToDrain + // We could have been triggered from a metadata update even though the + // user is not producing at all. If we have no buffered records, let's + // avoid potentially creating a producer ID. + if atomic.LoadInt64(&s.cl.producer.bufferedRecords) == 0 { + return false } // producerID can fail from: @@ -273,8 +280,9 @@ func (s *sink) produce(sem <-chan struct{}) bool { // EndTransaction in specific cases, but regardless, all buffered // records must fail. // - // We init the producer ID before creating a request to ensure we are - // always using the latest id/epoch with the proper sequence numbers. + // NOTE: we init the producer ID before creating a request to ensure we + // are always using the latest id/epoch with the proper sequence + // numbers. (i.e., resetAllSequenceNumbers && producerID logic combo). id, epoch, err := s.cl.producerID() if err != nil { switch err { @@ -291,13 +299,20 @@ func (s *sink) produce(sem <-chan struct{}) bool { return false } - req.producerID = id - req.producerEpoch = epoch + // NOTE: we create the req AFTER getting our producer ID! + // + // If a prior response caused errReloadProducerID, then calling + // producerID() sets needSeqReset, and creating the request resets + // sequence numbers. We need to have that logic occur before we create + // the request, otherwise we will create a request with the old + // sequence numbers using our new producer ID, which will then again + // fail with OOOSN. + req, txnReq, moreToDrain := s.createReq(id, epoch) + if len(req.batches) == 0 { // everything was failing or lingering + return moreToDrain + } if txnReq != nil { - txnReq.ProducerID = id - txnReq.ProducerEpoch = epoch - // txnReq can fail from: // - retry failure // - auth failure