Skip to content

Commit

Permalink
add FirstErrPromise
Browse files Browse the repository at this point in the history
This is a quality of life improvement; often times, we are only
interested in one error in a batch to signal the status of the entire
batch.

This is the async counterpart to ProduceSync's FirstErr
  • Loading branch information
twmb committed May 11, 2021
1 parent 9b8456a commit aea185e
Showing 1 changed file with 43 additions and 0 deletions.
43 changes: 43 additions & 0 deletions pkg/kgo/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,49 @@ func (cl *Client) ProduceSync(ctx context.Context, rs ...*Record) ProduceResults
return results
}

// FirstErrPromise is a helper type to capture only the first failing error
// when producing a batch of records with this type's promise function.
//
// This is useful for when you only care about any record failing, and can use
// that as a signal (i.e., to abort a batch).
//
// This is similar to using ProduceResult's FirstErr function.
type FirstErrPromise struct {
once uint32
mu sync.Mutex
err error
}

// Promise is a promise for producing that will store the first error
// encountered.
func (f *FirstErrPromise) Promise(_ *Record, err error) {
if err != nil && atomic.SwapUint32(&f.once, 1) == 0 {
f.mu.Lock()
f.err = err
f.mu.Unlock()
}
}

// PromiseFn returns a promise for producing that will store the first error
// encountered.
//
// This is provided as an alternative to just Promise for people less familiar
// with passing a type's method as an argument.
func (f *FirstErrPromise) PromiseFn() func(*Record, error) {
return f.Promise
}

// Err returns the stored error, if any.
//
// This is safe to use at any time, but for the purpose of this type, this This
// should only be used after any records using this promise have finished (i.e.,
// the client has been flushed or records have been aborted).
func (f *FirstErrPromise) Err() error {
f.mu.Lock()
defer f.mu.Unlock()
return f.err
}

// Produce sends a Kafka record to the topic in the record's Topic field,
// calling promise with the record or an error when Kafka replies. For a
// synchronous produce, see ProduceSync.
Expand Down

0 comments on commit aea185e

Please sign in to comment.