Skip to content

Commit

Permalink
sink: create producerID *BEFORE* produce request (partial revert of d…
Browse files Browse the repository at this point in the history
…c44d10)

dc44d10 was done because a metadata update would trigger a sink to
drain, and that would ALWAYS create a producer ID, and would then see
there was nothing to produce. So, we were creating producer IDs even for
pure consumers.

Unfortunately, switching the order meant that the errReloadProducerID /
needSeqReset combo was flawed. In fact, I even noted that the order
needed to be producerID and then createReq in the bottom of the docs
just above the producerID call.

We now re-move the producerID call above createReq, but we preserve the
intent of dc44d10 by checking if any records are buffered before
loading the producer ID.

This also adds an important doc comment with a big NOTE just above
createReq to prevent this order flipping from occurring again, as well
as makes the prior doc comment just above producerID more prominent.
  • Loading branch information
twmb committed May 4, 2021
1 parent 5475f6b commit 9ada82d
Showing 1 changed file with 30 additions and 15 deletions.
45 changes: 30 additions & 15 deletions pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,16 @@ func (cl *Client) newSink(nodeID int32) *sink {

// createReq returns a produceRequest from currently buffered records
// and whether there are more records to create more requests immediately.
func (s *sink) createReq() (*produceRequest, *kmsg.AddPartitionsToTxnRequest, bool) {
func (s *sink) createReq(id int64, epoch int16) (*produceRequest, *kmsg.AddPartitionsToTxnRequest, bool) {
req := &produceRequest{
txnID: s.cl.cfg.txnID,
acks: s.cl.cfg.acks.val,
timeout: int32(s.cl.cfg.produceTimeout.Milliseconds()),
batches: make(seqRecBatches, 5),

idempotent: s.cl.idempotent(),
producerID: id,
producerEpoch: epoch,
idempotent: s.cl.idempotent(),

compressor: s.cl.compressor,

Expand All @@ -92,6 +94,8 @@ func (s *sink) createReq() (*produceRequest, *kmsg.AddPartitionsToTxnRequest, bo
}
txnBuilder := txnReqBuilder{
txnID: req.txnID,
id: id,
epoch: epoch,
}

var moreToDrain bool
Expand Down Expand Up @@ -136,6 +140,8 @@ func (s *sink) createReq() (*produceRequest, *kmsg.AddPartitionsToTxnRequest, bo
type txnReqBuilder struct {
txnID *string
req *kmsg.AddPartitionsToTxnRequest
id int64
epoch int16
addedTopics map[string]int // topic => index into req
}

Expand All @@ -150,6 +156,8 @@ func (t *txnReqBuilder) add(rb *recBuf) {
if t.req == nil {
t.req = &kmsg.AddPartitionsToTxnRequest{
TransactionalID: *t.txnID,
ProducerID: t.id,
ProducerEpoch: t.epoch,
}
t.addedTopics = make(map[string]int, 10)
}
Expand Down Expand Up @@ -255,12 +263,11 @@ func (s *sink) produce(sem <-chan struct{}) bool {
}
}()

// We create the req before getting our producer ID. It is possible we
// were triggered to produce by a metadata update adding a recBuf to a
// sink, when in reality no records are actually on that recBuf.
req, txnReq, moreToDrain := s.createReq()
if len(req.batches) == 0 { // everything was failing or lingering
return moreToDrain
// We could have been triggered from a metadata update even though the
// user is not producing at all. If we have no buffered records, let's
// avoid potentially creating a producer ID.
if atomic.LoadInt64(&s.cl.producer.bufferedRecords) == 0 {
return false
}

// producerID can fail from:
Expand All @@ -273,8 +280,9 @@ func (s *sink) produce(sem <-chan struct{}) bool {
// EndTransaction in specific cases, but regardless, all buffered
// records must fail.
//
// We init the producer ID before creating a request to ensure we are
// always using the latest id/epoch with the proper sequence numbers.
// NOTE: we init the producer ID before creating a request to ensure we
// are always using the latest id/epoch with the proper sequence
// numbers. (i.e., resetAllSequenceNumbers && producerID logic combo).
id, epoch, err := s.cl.producerID()
if err != nil {
switch err {
Expand All @@ -291,13 +299,20 @@ func (s *sink) produce(sem <-chan struct{}) bool {
return false
}

req.producerID = id
req.producerEpoch = epoch
// NOTE: we create the req AFTER getting our producer ID!
//
// If a prior response caused errReloadProducerID, then calling
// producerID() sets needSeqReset, and creating the request resets
// sequence numbers. We need to have that logic occur before we create
// the request, otherwise we will create a request with the old
// sequence numbers using our new producer ID, which will then again
// fail with OOOSN.
req, txnReq, moreToDrain := s.createReq(id, epoch)
if len(req.batches) == 0 { // everything was failing or lingering
return moreToDrain
}

if txnReq != nil {
txnReq.ProducerID = id
txnReq.ProducerEpoch = epoch

// txnReq can fail from:
// - retry failure
// - auth failure
Expand Down

0 comments on commit 9ada82d

Please sign in to comment.