From 31f3f5f5ba0a8f614873c35b1bcfc3506c93b14e Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Sat, 26 Feb 2022 17:29:35 -0700 Subject: [PATCH] producer: serialize promises Having promises potentially be concurrent was originally thought to be an optimization, but overall after extensive benchmarking, it may not be as much of an optimization as thought. Previously, records that failed independent of producing could have their promises called whenever. Produced records would always have their promises called at the end of handling a produce response. Technically, this means that if a broker has max produce requests in flight, starting a new produce request would block on finishing promises for a previous one if the promises were slow. Now, we send all promises to a dedicated loop. This keeps our prior guarantee of strict ordering and divorces promises from produce requests. I expect the common case for promises is to serialize the records anyway, which implies locking within the promise. Hopefully, that locking can now be removed. It is possible that promises that did no work or did atomic work could be slower now, but extensive testing with franz-go's bench utility (and changing the code to auto-success produce requests) shows no difference -- if anything, serializing may be slightly faster. --- pkg/kgo/errors.go | 2 + pkg/kgo/group_test.go | 11 +-- pkg/kgo/producer.go | 163 +++++++++++++++++-------------- pkg/kgo/record_and_fetch.go | 3 +- pkg/kgo/ring.go | 62 ++++++++++++ pkg/kgo/sink.go | 34 +++---- pkg/kgo/topics_and_partitions.go | 5 +- pkg/kgo/txn_test.go | 8 +- 8 files changed, 179 insertions(+), 109 deletions(-) diff --git a/pkg/kgo/errors.go b/pkg/kgo/errors.go index 82cf213b..e9e78516 100644 --- a/pkg/kgo/errors.go +++ b/pkg/kgo/errors.go @@ -149,6 +149,8 @@ var ( // Returned when trying to produce a record outside of a transaction. errNotInTransaction = errors.New("cannot produce record transactionally if not in a transaction") + errNoTopic = errors.New("cannot produce record with no topic and no default topic") + // Returned for all buffered produce records when a user purges topics. errPurged = errors.New("topic purged while buffered") diff --git a/pkg/kgo/group_test.go b/pkg/kgo/group_test.go index bf85e0b0..cd7e72d2 100644 --- a/pkg/kgo/group_test.go +++ b/pkg/kgo/group_test.go @@ -6,7 +6,6 @@ import ( "fmt" "os" "strconv" - "sync" "sync/atomic" "testing" "time" @@ -37,10 +36,12 @@ func TestGroupETL(t *testing.T) { //////////////////// go func() { - cl, _ := NewClient(WithLogger(BasicLogger(os.Stderr, testLogLevel, nil))) + cl, _ := NewClient( + WithLogger(BasicLogger(os.Stderr, testLogLevel, nil)), + MaxBufferedRecords(10000), + ) defer cl.Close() - var offsetsMu sync.Mutex offsets := make(map[int32]int64) defer func() { @@ -66,7 +67,6 @@ func TestGroupETL(t *testing.T) { } // ensure the offsets for this partition are contiguous - offsetsMu.Lock() current, ok := offsets[r.Partition] if ok && r.Offset != current+1 { errs <- fmt.Errorf("partition produced offsets out of order, got %d != exp %d", r.Offset, current+1) @@ -74,8 +74,6 @@ func TestGroupETL(t *testing.T) { errs <- fmt.Errorf("expected first produced record to partition to have offset 0, got %d", r.Offset) } offsets[r.Partition] = r.Offset - - offsetsMu.Unlock() }, ) } @@ -122,6 +120,7 @@ func (c *testConsumer) etl(etlsBeforeQuit int) { ConsumerGroup(c.group), ConsumeTopics(c.consumeFrom), Balancers(c.balancer), + MaxBufferedRecords(10000), // Even with autocommitting, autocommitting does not commit // *the latest* when being revoked. We always want to commit diff --git a/pkg/kgo/producer.go b/pkg/kgo/producer.go index 6f2d3c23..8f0fcb51 100644 --- a/pkg/kgo/producer.go +++ b/pkg/kgo/producer.go @@ -52,6 +52,8 @@ type producer struct { notifyMu sync.Mutex notifyCond *sync.Cond + batchPromises ringBatchPromise + txnMu sync.Mutex inTxn bool } @@ -75,7 +77,7 @@ func (p *producer) init(cl *Client) { p.cl = cl p.topics = newTopicsPartitions() p.unknownTopics = make(map[string]*unknownTopicProduces) - p.waitBuffer = make(chan struct{}, 32) + p.waitBuffer = make(chan struct{}, math.MaxInt64) p.idVersion = -1 p.id.Store(&producerID{ id: -1, @@ -114,7 +116,10 @@ func (p *producer) purgeTopics(topics []string) { if unknown, exists := p.unknownTopics[topic]; exists { delete(p.unknownTopics, topic) close(unknown.wait) - p.failUnknownTopicRecords(unknown, errPurged) + p.promiseBatch(batchPromise{ + recs: unknown.buffered, + err: errPurged, + }) } } p.unknownTopicsMu.Unlock() @@ -203,12 +208,9 @@ func (rs ProduceResults) First() (*Record, error) { func (cl *Client) ProduceSync(ctx context.Context, rs ...*Record) ProduceResults { var ( wg sync.WaitGroup - mu sync.Mutex results = make(ProduceResults, 0, len(rs)) promise = func(r *Record, err error) { - mu.Lock() results = append(results, ProduceResult{r, err}) - mu.Unlock() wg.Done() } ) @@ -297,15 +299,14 @@ func (cl *Client) TryProduce( // calling an optional `promise` with the record and a potential error when // Kafka replies. For a synchronous produce, see ProduceSync. Records are // produced in order per partition if the record is produced successfully. -// Records that fail to be produced (topic load failures, unbufferable records, -// etc.) may have their promised called at any time, and may be called -// concurrent with other promises. Successfully produced records will have -// their attributes, offset, and partition set before the promise is called. +// Successfully produced records will have their attributes, offset, and +// partition set before the promise is called. All promises are called +// serially (and should be relatively fast). // // If the topic field is empty, the client will use the DefaultProduceTopic; if -// that is also empty, the record will be failed immediately. If the record is -// too large to fit in a batch on its own in a produce request, the record will -// be failed with immediately kerr.MessageTooLarge. +// that is also empty, the record is failed immediately. If the record is too +// large to fit in a batch on its own in a produce request, the record will be +// failed with immediately kerr.MessageTooLarge. // // If the client is configured to automatically flush the client currently has // the configured maximum amount of records buffered, Produce will block. The @@ -342,24 +343,7 @@ func (cl *Client) produce( promise = noPromise } - if r.Topic == "" { - def := cl.cfg.defaultProduceTopic - if def == "" { - go promise(r, errors.New("cannot produce to a record that does not have a topic set")) - return - } - r.Topic = def - } - p := &cl.producer - - if cl.cfg.txnID != nil && atomic.LoadUint32(&p.producingTxn) != 1 { - go promise(r, errNotInTransaction) // see comment just below for why we 'go' this - return - } - - // Our record is now "buffered", and past this point will fall into - // finishRecordPromise, where we track it is finished. if p.hooks != nil { for _, h := range p.hooks.buffered { h.OnProduceRecordBuffered(r) @@ -371,18 +355,9 @@ func (cl *Client) produce( // need to un-count our buffering of this record. We also need // to drain a slot from the waitBuffer chan, which could be // sent to right when we are erroring. - // - // We issue the waitBuffer drain in a goroutine because we do - // not want to block sending to it. - // - // We issue the promise finishing in a goroutine because we do - // not want to block Produce on executing the promise. The user - // could be consuming from a channel that is sent to in the - // promise only *after* Produce returns; not executing the - // promise in a goroutine would lead to a deadlock. drainBuffered := func(err error) { - go func() { <-p.waitBuffer }() - go cl.finishRecordPromise(promisedRec{ctx, promise, r}, err) + p.promiseRecord(promisedRec{ctx, promise, r}, err) + <-p.waitBuffer } if !block || cl.cfg.manualFlushing { drainBuffered(ErrMaxBuffered) @@ -399,9 +374,66 @@ func (cl *Client) produce( } } + // Neither of the errors below should be hit in applications. + if r.Topic == "" { + def := cl.cfg.defaultProduceTopic + if def == "" { + p.promiseRecord(promisedRec{ctx, promise, r}, errNoTopic) + return + } + r.Topic = def + } + if cl.cfg.txnID != nil && atomic.LoadUint32(&p.producingTxn) != 1 { + p.promiseRecord(promisedRec{ctx, promise, r}, errNotInTransaction) + return + } + cl.partitionRecord(promisedRec{ctx, promise, r}) } +type batchPromise struct { + baseOffset int64 + pid int64 + epoch int16 + attrs RecordAttrs + partition int32 + recs []promisedRec + err error +} + +func (p *producer) promiseBatch(b batchPromise) { + if first := p.batchPromises.push(b); first { + go p.finishPromises(b) + } +} + +func (p *producer) promiseRecord(pr promisedRec, err error) { + p.promiseBatch(batchPromise{recs: []promisedRec{pr}, err: err}) +} + +func (p *producer) finishPromises(b batchPromise) { + cl := p.cl + var more bool +start: + for i, pr := range b.recs { + pr.Offset = b.baseOffset + int64(i) + pr.Partition = b.partition + pr.ProducerID = b.pid + pr.ProducerEpoch = b.epoch + pr.Attrs = b.attrs + cl.finishRecordPromise(pr, b.err) + b.recs[i] = promisedRec{} + } + if cap(b.recs) > 4 { + cl.prsPool.put(b.recs) + } + + b, more = p.batchPromises.dropPeek() + if more { + goto start + } +} + func (cl *Client) finishRecordPromise(pr promisedRec, err error) { p := &cl.producer @@ -418,7 +450,7 @@ func (cl *Client) finishRecordPromise(pr promisedRec, err error) { buffered := atomic.AddInt64(&p.bufferedRecords, -1) if buffered >= cl.cfg.maxBufferedRecords { - go func() { p.waitBuffer <- struct{}{} }() + p.waitBuffer <- struct{}{} } else if buffered == 0 && atomic.LoadInt32(&p.flushing) > 0 { p.notifyMu.Lock() p.notifyMu.Unlock() // nolint:gocritic,staticcheck // We use the lock as a barrier, unlocking immediately is safe. @@ -441,7 +473,7 @@ func (cl *Client) partitionRecord(pr promisedRec) { // partitions can call this directly. func (cl *Client) doPartitionRecord(parts *topicPartitions, partsData *topicPartitionsData, pr promisedRec) { if partsData.loadErr != nil && !kerr.IsRetriable(partsData.loadErr) { - cl.finishRecordPromise(pr, partsData.loadErr) + cl.producer.promiseRecord(pr, partsData.loadErr) return } @@ -456,7 +488,7 @@ func (cl *Client) doPartitionRecord(parts *topicPartitions, partsData *topicPart mapping = partsData.partitions } if len(mapping) == 0 { - cl.finishRecordPromise(pr, errors.New("unable to partition record due to no usable partitions")) + cl.producer.promiseRecord(pr, errors.New("unable to partition record due to no usable partitions")) return } @@ -472,7 +504,7 @@ func (cl *Client) doPartitionRecord(parts *topicPartitions, partsData *topicPart pick = parts.partitioner.Partition(pr.Record, len(mapping)) } if pick < 0 || pick >= len(mapping) { - cl.finishRecordPromise(pr, fmt.Errorf("invalid record partitioning choice of %d from %d available", pick, len(mapping))) + cl.producer.promiseRecord(pr, fmt.Errorf("invalid record partitioning choice of %d from %d available", pick, len(mapping))) return } @@ -492,7 +524,7 @@ func (cl *Client) doPartitionRecord(parts *topicPartitions, partsData *topicPart } if pick < 0 || pick >= len(mapping) { - cl.finishRecordPromise(pr, fmt.Errorf("invalid record partitioning choice of %d from %d available", pick, len(mapping))) + cl.producer.promiseRecord(pr, fmt.Errorf("invalid record partitioning choice of %d from %d available", pick, len(mapping))) return } partition = mapping[pick] @@ -796,30 +828,10 @@ func (cl *Client) waitUnknownTopic( cl.cfg.logger.Log(LogLevelInfo, "new topic metadata wait failed, done retrying, failing all records", "topic", topic, "err", err) delete(p.unknownTopics, topic) - p.failUnknownTopicRecords(unknown, err) -} - -// Called under the unknown mu, this finishes promises for an unknown topic. -// -// We do not delete from the producer's topics due to potential concurrent -// metadata updating issues: if the metadata has an active request loading for -// a topic we are actively deleting now, and that request finally loads the -// topic successfully, it will create recBuf pointers that will not be cleaned -// up. -// -// We could work around this using the same blockingMetadataFn type logic that -// we use when unsetting a consumer, but it's more finnicky for a producer -// because we want to knife out a single topic. -// -// Leaving a topic buffered even if we failed it as unknown should be of no -// consequence because clients should not really be producing to loads of -// unknown topics. -func (p *producer) failUnknownTopicRecords(unknown *unknownTopicProduces, err error) { - go func() { - for _, pr := range unknown.buffered { - p.cl.finishRecordPromise(pr, err) - } - }() + p.promiseBatch(batchPromise{ + recs: unknown.buffered, + err: err, + }) } // Flush hangs waiting for all buffered records to be flushed, stopping all @@ -941,11 +953,10 @@ func (cl *Client) failBufferedRecords(err error) { toFail = append(toFail, unknown.buffered) } - go func() { - for _, fail := range toFail { - for _, pr := range fail { - cl.finishRecordPromise(pr, err) - } - } - }() + for _, fail := range toFail { + p.promiseBatch(batchPromise{ + recs: fail, + err: err, + }) + } } diff --git a/pkg/kgo/record_and_fetch.go b/pkg/kgo/record_and_fetch.go index ef06d2cc..f367ffb1 100644 --- a/pkg/kgo/record_and_fetch.go +++ b/pkg/kgo/record_and_fetch.go @@ -135,8 +135,7 @@ type Record struct { // For producing, this is left unset. This will be set by the client as // appropriate. If you are producing with no acks, this will just be // the offset used in the produce request and does not mirror the - // offset actually stored within Kafka. The offset will not be valid - // unless the record was successfully produced. + // offset actually stored within Kafka. Offset int64 } diff --git a/pkg/kgo/ring.go b/pkg/kgo/ring.go index 347891b6..c0eb5bd2 100644 --- a/pkg/kgo/ring.go +++ b/pkg/kgo/ring.go @@ -211,3 +211,65 @@ func (r *ringSeqResp) dropPeek() (next *seqResp, more, dead bool) { return r.elems[r.head], r.l > 0, r.dead } + +// Also no die; this type is slightly different because we can have overflow. +// If we have overflow, we add to overflow until overflow is drained -- we +// always want strict odering. +type ringBatchPromise struct { + mu sync.Mutex + + elems [eight]batchPromise + + head uint8 + tail uint8 + l uint8 + + overflow []batchPromise +} + +func (r *ringBatchPromise) push(b batchPromise) (first bool) { + r.mu.Lock() + defer r.mu.Unlock() + + // If the ring is full, we go into overflow; if overflow is non-empty, + // for ordering purposes, we add to the end of overflow. We only go + // back to using the ring once overflow is finally empty. + if r.l == eight || len(r.overflow) > 0 { + r.overflow = append(r.overflow, b) + return false + } + + r.elems[r.tail] = b + r.tail = (r.tail + 1) & mask7 + r.l++ + + return r.l == 1 +} + +func (r *ringBatchPromise) dropPeek() (next batchPromise, more bool) { + r.mu.Lock() + defer r.mu.Unlock() + + // We always drain the ring first. If the ring is ever empty, there + // must be overflow: we would not be here if the ring is not-empty. + if r.l > 1 { + r.elems[r.head] = batchPromise{} + r.head = (r.head + 1) & mask7 + r.l-- + return r.elems[r.head], true + } else if r.l == 1 { + r.elems[r.head] = batchPromise{} + r.head = (r.head + 1) & mask7 + r.l-- + if len(r.overflow) == 0 { + return next, false + } + return r.overflow[0], true + } else { + r.overflow = r.overflow[1:] + if len(r.overflow) > 0 { + return r.overflow[0], true + } + return next, false + } +} diff --git a/pkg/kgo/sink.go b/pkg/kgo/sink.go index a52f22ee..9c13f50c 100644 --- a/pkg/kgo/sink.go +++ b/pkg/kgo/sink.go @@ -841,23 +841,20 @@ func (cl *Client) finishBatch(batch *recBatch, producerID int64, producerEpoch i batch.records = nil batch.mu.Unlock() - for i, pr := range records { - pr.Offset = baseOffset + int64(i) - pr.Partition = partition - pr.ProducerID = producerID - pr.ProducerEpoch = producerEpoch - + cl.producer.promiseBatch(batchPromise{ + baseOffset: baseOffset, + pid: producerID, + epoch: producerEpoch, // A recBuf.attrs is updated when appending to be written. For // v0 && v1 produce requests, we set bit 8 in the attrs // corresponding to our own RecordAttr's bit 8 being no // timestamp type. Thus, we can directly convert the batch // attrs to our own RecordAttrs. - pr.Attrs = RecordAttrs{uint8(attrs)} - - cl.finishRecordPromise(pr, err) - records[i] = promisedRec{} - } - cl.prsPool.put(records) + attrs: RecordAttrs{uint8(attrs)}, + partition: partition, + recs: records, + err: err, + }) } // handleRetryBatches sets any first-buf-batch to failing and triggers a @@ -1064,7 +1061,7 @@ func (recBuf *recBuf) bufferRecord(pr promisedRec, abortOnNewBatch bool) bool { pr.Timestamp = time.Now().Truncate(time.Millisecond) if recBuf.purged { - recBuf.cl.finishRecordPromise(pr, errPurged) + recBuf.cl.producer.promiseRecord(pr, errPurged) return true } @@ -1089,7 +1086,7 @@ func (recBuf *recBuf) bufferRecord(pr promisedRec, abortOnNewBatch bool) bool { return false case appended: // we return true below default: // processed as failure - recBuf.cl.finishRecordPromise(pr, kerr.MessageTooLarge) + recBuf.cl.producer.promiseRecord(pr, kerr.MessageTooLarge) return true } @@ -1222,11 +1219,10 @@ func (recBuf *recBuf) failAllRecords(err error) { batch.records = nil batch.mu.Unlock() - for i, pr := range records { - recBuf.cl.finishRecordPromise(pr, err) - records[i] = promisedRec{} - } - recBuf.cl.prsPool.put(records) + recBuf.cl.producer.promiseBatch(batchPromise{ + recs: records, + err: err, + }) } recBuf.resetBatchDrainIdx() atomic.StoreInt64(&recBuf.buffered, 0) diff --git a/pkg/kgo/topics_and_partitions.go b/pkg/kgo/topics_and_partitions.go index cf66fe7a..1f7671b2 100644 --- a/pkg/kgo/topics_and_partitions.go +++ b/pkg/kgo/topics_and_partitions.go @@ -296,7 +296,10 @@ func (cl *Client) storePartitionsUpdate(topic string, l *topicPartitions, lv *to close(unknown.wait) // allow waiting goroutine to quit if len(lv.partitions) == 0 { - cl.producer.failUnknownTopicRecords(unknown, lv.loadErr) + cl.producer.promiseBatch(batchPromise{ + recs: unknown.buffered, + err: lv.loadErr, + }) } else { for _, pr := range unknown.buffered { cl.doPartitionRecord(l, lv, pr) diff --git a/pkg/kgo/txn_test.go b/pkg/kgo/txn_test.go index 7c52e0e2..85d3a07f 100644 --- a/pkg/kgo/txn_test.go +++ b/pkg/kgo/txn_test.go @@ -7,7 +7,6 @@ import ( "fmt" "os" "strconv" - "sync" "sync/atomic" "testing" "time" @@ -32,6 +31,7 @@ func TestTxnEtl(t *testing.T) { WithLogger(BasicLogger(os.Stderr, testLogLevel, nil)), TransactionalID("p"+randsha()), TransactionTimeout(2*time.Minute), + MaxBufferedRecords(10000), ) if err != nil { panic(err) @@ -39,7 +39,6 @@ func TestTxnEtl(t *testing.T) { defer cl.Close() - var offsetsMu sync.Mutex offsets := make(map[int32]int64) partsUsed := make(map[int32]struct{}) @@ -91,15 +90,13 @@ func TestTxnEtl(t *testing.T) { errs <- fmt.Errorf("unexpected out of order key; got %s != exp %v", r.Key, myKey) } - // ensure the offsets for this partition are contiguous - offsetsMu.Lock() + // ensure the offsets for this partition are monotonically increasing current, ok := offsets[r.Partition] if ok && r.Offset <= current { errs <- fmt.Errorf("partition %d produced offsets out of order, got %d != exp %d", r.Partition, r.Offset, current+1) } offsets[r.Partition] = r.Offset partsUsed[r.Partition] = struct{}{} - offsetsMu.Unlock() }, ) } @@ -150,6 +147,7 @@ func (c *testConsumer) transact(txnsBeforeQuit int) { ConsumeTopics(c.consumeFrom), FetchIsolationLevel(ReadCommitted()), Balancers(c.balancer), + MaxBufferedRecords(10000), ) defer txnSess.Close()