Skip to content

Commit

Permalink
producer: better ProduceSync api
Browse files Browse the repository at this point in the history
This allows for producing either one record or a batch of records,
meaning ProduceSync is now more useful for more than just the
infrequent-produce case.
  • Loading branch information
twmb committed Apr 27, 2021
1 parent 4b4b9dd commit 7c9f591
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 23 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
tip
===

- [`db983d63`](https://github.com/twmb/franz-go/commit/db983d63) **feature** producer: add ProduceSync
- [`fb315873`](https://github.com/twmb/franz-go/commit/fb315873) **feature** producer: add ProduceSync

v0.6.14
===
Expand Down
74 changes: 52 additions & 22 deletions pkg/kgo/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,44 +82,74 @@ func (p *producer) isAborting() bool { return atomic.LoadUint32(&p.aborting) ==

func noPromise(*Record, error) {}

// ProduceResult is the result of producing a record in a synchronous manner.
type ProduceResult struct {
// Record is the produced record. It is always non-nil.
//
// If this record was produced successfully, its attrs / offset / id /
// epoch / etc. fields are filled in on return if possible (i.e. when
// producing with acks required).
Record *Record

// Err is a potential produce error. If this is non-nil, the record was
// not produced successfully.
Err error
}

// ProduceResults is a collection of produce results.
type ProduceResults []ProduceResult

// FirstErr returns the first erroring result, if any.
func (rs ProduceResults) FirstErr() error {
for _, r := range rs {
if r.Err != nil {
return r.Err
}
}
return nil
}

// ProduceSync is a synchronous produce. Please see the Produce documentation
// for an in depth description of how producing works.
//
// Note that it is heavily recommended to not use ProduceSync. Producing
// buffers multiple records into a single request issued to Kafka. A
// synchronous produce implies you may be producing one record per request,
// which is inefficient, slower, and puts more load on Kafka itself.
//
// This function should only be used when producing infrequently enough that
// waiting for a single record to be produced is what would happen anyway with
// Produce.
//
// If the produce is successful, the record's attrs / offset / etc. fields are
// updated appropriately.
func (cl *Client) ProduceSync(ctx context.Context, r *Record) error {
// This function produces all records in one range loop and waits for them all
// to be produced before returning.
func (cl *Client) ProduceSync(ctx context.Context, rs ...*Record) ProduceResults {
var (
wg sync.WaitGroup
err error
promise = func(_ *Record, perr error) {
err = perr
mu sync.Mutex
results = make(ProduceResults, 0, len(rs))
promise = func(r *Record, err error) {
mu.Lock()
results = append(results, ProduceResult{r, err})
mu.Unlock()
wg.Done()
}
)
wg.Add(1)
if perr := cl.Produce(ctx, r, promise); perr != nil {
return perr

wg.Add(len(rs))
for _, r := range rs {
if err := cl.Produce(ctx, r, promise); err != nil {
promise(r, err)
}
}
wg.Wait()
return err

return results
}

// Produce sends a Kafka record to the topic in the record's Topic field,
// calling promise with the record or an error when Kafka replies. For a
// synchronous produce (which is not recommended), see ProduceSync.
// synchronous produce, see ProduceSync.
//
// The promise is optional, but not using it means you will not know if Kafka
// recorded a record properly. If there was no produce error, the record's
// attrs / offset / etc. fields are updated appropriately.
// recorded a record properly. Promises are called in order if the record is
// buffered to a topic that has loaded successfully. Topics that fail loading,
// or records that cannot be buffered, may not have their promises called in
// order. Promises may be called concurrently.
//
// If there was no produce error, the record's attrs / offset / etc. fields are
// updated appropriately before a promise is called.
//
// If the record is too large to fit in a batch on its own in a produce
// request, the promise is called immediately before this function returns with
Expand Down

0 comments on commit 7c9f591

Please sign in to comment.