Skip to content

Commit

Permalink
FirstErrPromise: change semantics
Browse files Browse the repository at this point in the history
- we can now abort when we receive an error
- we now wait for all produced records to flush
  • Loading branch information
twmb committed May 21, 2021
1 parent fce380d commit 467e739
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 80 deletions.
10 changes: 3 additions & 7 deletions examples/transactions/eos/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,10 @@ func inputProducer() {
msg = "abort "
}

var e kgo.FirstErrPromise
e := kgo.AbortingFirstErrPromise(cl)
for i := 0; i < 10; i++ {
cl.Produce(ctx, kgo.StringRecord(msg+strconv.Itoa(i)), e.Promise)
cl.Produce(ctx, kgo.StringRecord(msg+strconv.Itoa(i)), e.Promise())
}
if err := cl.Flush(ctx); err != nil {
die("Flush only returns error if the context is canceled: %v", err)
}

commit := kgo.TransactionEndTry(doCommit && e.Err() == nil)

switch err := cl.EndTransaction(ctx, commit); err {
Expand Down Expand Up @@ -139,7 +135,7 @@ func eosConsumer() {
die("unable to start transaction: %v", err)
}

var e kgo.FirstErrPromise
e := kgo.AbortingFirstErrPromise(cl)
fetches.EachRecord(func(r *kgo.Record) {
sess.Produce(ctx, kgo.StringRecord("eos "+string(r.Value)), e.Promise)
})
Expand Down
60 changes: 40 additions & 20 deletions pkg/kgo/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type producer struct {
// recBuf could be created and records sent to while we are flushing.
flushing int32 // >0 if flushing, can Flush many times concurrently

aborting uint32 // 1 means yes
aborting int32 // >0 if aborting, can abort many times concurrently

idMu sync.Mutex
idVersion int16
Expand Down Expand Up @@ -65,7 +65,7 @@ func (p *producer) init() {
p.notifyCond = sync.NewCond(&p.notifyMu)
}

func (p *producer) isAborting() bool { return atomic.LoadUint32(&p.aborting) == 1 }
func (p *producer) isAborting() bool { return atomic.LoadInt32(&p.aborting) > 0 }

func noPromise(*Record, error) {}

Expand Down Expand Up @@ -131,45 +131,62 @@ func (cl *Client) ProduceSync(ctx context.Context, rs ...*Record) ProduceResults
}

// FirstErrPromise is a helper type to capture only the first failing error
// when producing a batch of records with this type's promise function.
// when producing a batch of records with this type's Promise function.
//
// This is useful for when you only care about any record failing, and can use
// that as a signal (i.e., to abort a batch).
// that as a signal (i.e., to abort a batch). The AbortingFirstErrPromise
// function can be used to abort all records as soon as the first error is
// encountered. If you do not need to abort, you can use this type with no
// constructor.
//
// This is similar to using ProduceResult's FirstErr function.
type FirstErrPromise struct {
wg sync.WaitGroup
once uint32
mu sync.Mutex
err error
cl *Client
}

// AbortingFirstErrPromise returns a FirstErrPromise that will call the
// client's AbortBufferedRecords function if an error is encountered.
//
// This can be used to quickly exit when any error is encountered, rather than
// waiting while flushing only to discover things errored.
func AbortingFirstErrPromise(cl *Client) *FirstErrPromise {
return &FirstErrPromise{
cl: cl,
}
}

// Promise is a promise for producing that will store the first error
// encountered.
func (f *FirstErrPromise) Promise(_ *Record, err error) {
func (f *FirstErrPromise) promise(_ *Record, err error) {
defer f.wg.Done()
if err != nil && atomic.SwapUint32(&f.once, 1) == 0 {
f.mu.Lock()
f.err = err
f.mu.Unlock()
if f.cl != nil {
f.wg.Add(1)
go func() {
defer f.wg.Done()
f.cl.AbortBufferedRecords(context.Background())
}()
}
}
}

// PromiseFn returns a promise for producing that will store the first error
// Promise returns a promise for producing that will store the first error
// encountered.
//
// This is provided as an alternative to just Promise for people less familiar
// with passing a type's method as an argument.
func (f *FirstErrPromise) PromiseFn() func(*Record, error) {
return f.Promise
// The returned promise must eventually be called, because a FirstErrPromise
// does not return from 'Err' until all promises are completed.
func (f *FirstErrPromise) Promise() func(*Record, error) {
f.wg.Add(1)
return f.promise
}

// Err returns the stored error, if any.
//
// This is safe to use at any time, but for the purpose of this type, this This
// should only be used after any records using this promise have finished (i.e.,
// the client has been flushed or records have been aborted).
// Err waits for all promises to complete and then returns any stored error.
func (f *FirstErrPromise) Err() error {
f.mu.Lock()
defer f.mu.Unlock()
f.wg.Wait()
return f.err
}

Expand Down Expand Up @@ -673,6 +690,9 @@ func (cl *Client) failUnknownTopicRecords(topic string, unknown *unknownTopicPro
// lingers if necessary.
//
// If the context finishes (Done), this returns the context's error.
//
// This function is safe to call multiple times concurrently, and safe to call
// concurrent with Flush.
func (cl *Client) Flush(ctx context.Context) error {
p := &cl.producer

Expand Down
71 changes: 18 additions & 53 deletions pkg/kgo/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,10 @@ func (s *GroupTransactSession) failed() bool {
// successful. If commit is false, the group has rebalanced, or any partition
// in committing offsets fails, this aborts.
//
// This function calls Flush or AbortBufferedRecords depending on the commit
// status. If you are flushing, it is strongly recommended to Flush yourself
// before calling this, so that you can then determine if you need to abort.
//
// This returns whether the transaction committed or any error that occurred.
// No returned error is retriable. Either the transactional ID has entered a
// failed state, or the client retried so much that the retry limit was hit,
Expand Down Expand Up @@ -357,73 +361,34 @@ func (cl *Client) BeginTransaction() error {
}

// AbortBufferedRecords fails all unflushed records with ErrAborted and waits
// for there to be no buffered records. It is likely necessary to call
// ResetProducerID after this function; these two functions should only be
// called when not concurrently producing and only if you know what you are
// doing.
// for there to be no buffered records.
//
// This accepts a context to quit the wait early, but it is strongly
// recommended to always wait for all records to be flushed. Waits should not
// occur. The only case where this function returns an error is if the context
// is canceled while flushing.
//
// The intent of this function is to provide a way to clear the client's
// production backlog.
//
// For example, before aborting a transaction and beginning a new one, it would
// be erroneous to not wait for the backlog to clear before beginning a new
// transaction. Anything not cleared may be a part of the new transaction.
// production backlog. For example, before aborting a transaction and
// beginning a new one, it would be erroneous to not wait for the backlog to
// clear before beginning a new transaction. Anything not cleared may be a part
// of the new transaction.
//
// Records produced during or after a call to this function may not be failed,
// thus it is incorrect to concurrently produce with this function.
//
// This function is safe to call multiple times concurrently, and safe to call
// concurrent with Flush.
func (cl *Client) AbortBufferedRecords(ctx context.Context) error {
p := &cl.producer

atomic.StoreUint32(&p.aborting, 1)
defer atomic.StoreUint32(&p.aborting, 0)
atomic.AddInt32(&p.flushing, 1) // disallow lingering to start
defer atomic.AddInt32(&p.flushing, -1)
// At this point, all drain loops that start will immediately stop,
// thus they will not begin any AddPartitionsToTxn request. We must
// now wait for any req currently built to be done being issued.
atomic.AddInt32(&cl.producer.aborting, 1)
defer atomic.AddInt32(&cl.producer.aborting, -1)

cl.cfg.logger.Log(LogLevelInfo, "aborting buffered records")
cl.cfg.logger.Log(LogLevelInfo, "producer state set to aborting; continuing to wait via flushing")
defer cl.cfg.logger.Log(LogLevelDebug, "aborted buffered records")

// Similar to flushing, we unlinger; nothing will start a linger because
// the flushing atomic is non-zero.
if cl.cfg.linger > 0 || cl.cfg.manualFlushing {
for _, parts := range p.topics.load() {
for _, part := range parts.load().partitions {
part.records.unlingerAndManuallyDrain()
}
}
}

// We have to wait for all buffered records to either be flushed
// or to safely abort themselves.
quit := false
done := make(chan struct{})
go func() {
p.notifyMu.Lock()
defer p.notifyMu.Unlock()
defer close(done)

for !quit && atomic.LoadInt64(&p.bufferedRecords) > 0 {
p.notifyCond.Wait()
}
}()

select {
case <-done:
return nil
case <-ctx.Done():
p.notifyMu.Lock()
quit = true
p.notifyMu.Unlock()
p.notifyCond.Broadcast()
return ctx.Err()
}
// Setting the aborting state allows records to fail before
// or after produce requests; thus, now we just flush.
return cl.Flush(ctx)
}

// EndTransaction ends a transaction and resets the client's internal state to
Expand Down

0 comments on commit 467e739

Please sign in to comment.