From 7c9f591a2e6f79a75e43d56668b95aad717a759d Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Tue, 27 Apr 2021 00:17:35 -0600 Subject: [PATCH] producer: better ProduceSync api This allows for producing either one record or a batch of records, meaning ProduceSync is now more useful for more than just the infrequent-produce case. --- CHANGELOG.md | 2 +- pkg/kgo/producer.go | 74 +++++++++++++++++++++++++++++++-------------- 2 files changed, 53 insertions(+), 23 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 474f39e1..96ad8fa6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,7 @@ tip === -- [`db983d63`](https://github.com/twmb/franz-go/commit/db983d63) **feature** producer: add ProduceSync +- [`fb315873`](https://github.com/twmb/franz-go/commit/fb315873) **feature** producer: add ProduceSync v0.6.14 === diff --git a/pkg/kgo/producer.go b/pkg/kgo/producer.go index 6e9c3242..62dc4321 100644 --- a/pkg/kgo/producer.go +++ b/pkg/kgo/producer.go @@ -82,44 +82,74 @@ func (p *producer) isAborting() bool { return atomic.LoadUint32(&p.aborting) == func noPromise(*Record, error) {} +// ProduceResult is the result of producing a record in a synchronous manner. +type ProduceResult struct { + // Record is the produced record. It is always non-nil. + // + // If this record was produced successfully, its attrs / offset / id / + // epoch / etc. fields are filled in on return if possible (i.e. when + // producing with acks required). + Record *Record + + // Err is a potential produce error. If this is non-nil, the record was + // not produced successfully. + Err error +} + +// ProduceResults is a collection of produce results. +type ProduceResults []ProduceResult + +// FirstErr returns the first erroring result, if any. +func (rs ProduceResults) FirstErr() error { + for _, r := range rs { + if r.Err != nil { + return r.Err + } + } + return nil +} + // ProduceSync is a synchronous produce. Please see the Produce documentation // for an in depth description of how producing works. // -// Note that it is heavily recommended to not use ProduceSync. Producing -// buffers multiple records into a single request issued to Kafka. A -// synchronous produce implies you may be producing one record per request, -// which is inefficient, slower, and puts more load on Kafka itself. -// -// This function should only be used when producing infrequently enough that -// waiting for a single record to be produced is what would happen anyway with -// Produce. -// -// If the produce is successful, the record's attrs / offset / etc. fields are -// updated appropriately. -func (cl *Client) ProduceSync(ctx context.Context, r *Record) error { +// This function produces all records in one range loop and waits for them all +// to be produced before returning. +func (cl *Client) ProduceSync(ctx context.Context, rs ...*Record) ProduceResults { var ( wg sync.WaitGroup - err error - promise = func(_ *Record, perr error) { - err = perr + mu sync.Mutex + results = make(ProduceResults, 0, len(rs)) + promise = func(r *Record, err error) { + mu.Lock() + results = append(results, ProduceResult{r, err}) + mu.Unlock() wg.Done() } ) - wg.Add(1) - if perr := cl.Produce(ctx, r, promise); perr != nil { - return perr + + wg.Add(len(rs)) + for _, r := range rs { + if err := cl.Produce(ctx, r, promise); err != nil { + promise(r, err) + } } wg.Wait() - return err + + return results } // Produce sends a Kafka record to the topic in the record's Topic field, // calling promise with the record or an error when Kafka replies. For a -// synchronous produce (which is not recommended), see ProduceSync. +// synchronous produce, see ProduceSync. // // The promise is optional, but not using it means you will not know if Kafka -// recorded a record properly. If there was no produce error, the record's -// attrs / offset / etc. fields are updated appropriately. +// recorded a record properly. Promises are called in order if the record is +// buffered to a topic that has loaded successfully. Topics that fail loading, +// or records that cannot be buffered, may not have their promises called in +// order. Promises may be called concurrently. +// +// If there was no produce error, the record's attrs / offset / etc. fields are +// updated appropriately before a promise is called. // // If the record is too large to fit in a batch on its own in a produce // request, the promise is called immediately before this function returns with