From 467e739263f3a2bb349679b80f14ae486ead7338 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Thu, 20 May 2021 18:08:36 -0600 Subject: [PATCH] FirstErrPromise: change semantics - we can now abort when we receive an error - we now wait for all produced records to flush --- examples/transactions/eos/main.go | 10 ++--- pkg/kgo/producer.go | 60 +++++++++++++++++--------- pkg/kgo/txn.go | 71 ++++++++----------------------- 3 files changed, 61 insertions(+), 80 deletions(-) diff --git a/examples/transactions/eos/main.go b/examples/transactions/eos/main.go index 4e8567e9..906e8914 100644 --- a/examples/transactions/eos/main.go +++ b/examples/transactions/eos/main.go @@ -71,14 +71,10 @@ func inputProducer() { msg = "abort " } - var e kgo.FirstErrPromise + e := kgo.AbortingFirstErrPromise(cl) for i := 0; i < 10; i++ { - cl.Produce(ctx, kgo.StringRecord(msg+strconv.Itoa(i)), e.Promise) + cl.Produce(ctx, kgo.StringRecord(msg+strconv.Itoa(i)), e.Promise()) } - if err := cl.Flush(ctx); err != nil { - die("Flush only returns error if the context is canceled: %v", err) - } - commit := kgo.TransactionEndTry(doCommit && e.Err() == nil) switch err := cl.EndTransaction(ctx, commit); err { @@ -139,7 +135,7 @@ func eosConsumer() { die("unable to start transaction: %v", err) } - var e kgo.FirstErrPromise + e := kgo.AbortingFirstErrPromise(cl) fetches.EachRecord(func(r *kgo.Record) { sess.Produce(ctx, kgo.StringRecord("eos "+string(r.Value)), e.Promise) }) diff --git a/pkg/kgo/producer.go b/pkg/kgo/producer.go index 1639aab4..9ba172f0 100644 --- a/pkg/kgo/producer.go +++ b/pkg/kgo/producer.go @@ -33,7 +33,7 @@ type producer struct { // recBuf could be created and records sent to while we are flushing. flushing int32 // >0 if flushing, can Flush many times concurrently - aborting uint32 // 1 means yes + aborting int32 // >0 if aborting, can abort many times concurrently idMu sync.Mutex idVersion int16 @@ -65,7 +65,7 @@ func (p *producer) init() { p.notifyCond = sync.NewCond(&p.notifyMu) } -func (p *producer) isAborting() bool { return atomic.LoadUint32(&p.aborting) == 1 } +func (p *producer) isAborting() bool { return atomic.LoadInt32(&p.aborting) > 0 } func noPromise(*Record, error) {} @@ -131,45 +131,62 @@ func (cl *Client) ProduceSync(ctx context.Context, rs ...*Record) ProduceResults } // FirstErrPromise is a helper type to capture only the first failing error -// when producing a batch of records with this type's promise function. +// when producing a batch of records with this type's Promise function. // // This is useful for when you only care about any record failing, and can use -// that as a signal (i.e., to abort a batch). +// that as a signal (i.e., to abort a batch). The AbortingFirstErrPromise +// function can be used to abort all records as soon as the first error is +// encountered. If you do not need to abort, you can use this type with no +// constructor. // // This is similar to using ProduceResult's FirstErr function. type FirstErrPromise struct { + wg sync.WaitGroup once uint32 - mu sync.Mutex err error + cl *Client +} + +// AbortingFirstErrPromise returns a FirstErrPromise that will call the +// client's AbortBufferedRecords function if an error is encountered. +// +// This can be used to quickly exit when any error is encountered, rather than +// waiting while flushing only to discover things errored. +func AbortingFirstErrPromise(cl *Client) *FirstErrPromise { + return &FirstErrPromise{ + cl: cl, + } } // Promise is a promise for producing that will store the first error // encountered. -func (f *FirstErrPromise) Promise(_ *Record, err error) { +func (f *FirstErrPromise) promise(_ *Record, err error) { + defer f.wg.Done() if err != nil && atomic.SwapUint32(&f.once, 1) == 0 { - f.mu.Lock() f.err = err - f.mu.Unlock() + if f.cl != nil { + f.wg.Add(1) + go func() { + defer f.wg.Done() + f.cl.AbortBufferedRecords(context.Background()) + }() + } } } -// PromiseFn returns a promise for producing that will store the first error +// Promise returns a promise for producing that will store the first error // encountered. // -// This is provided as an alternative to just Promise for people less familiar -// with passing a type's method as an argument. -func (f *FirstErrPromise) PromiseFn() func(*Record, error) { - return f.Promise +// The returned promise must eventually be called, because a FirstErrPromise +// does not return from 'Err' until all promises are completed. +func (f *FirstErrPromise) Promise() func(*Record, error) { + f.wg.Add(1) + return f.promise } -// Err returns the stored error, if any. -// -// This is safe to use at any time, but for the purpose of this type, this This -// should only be used after any records using this promise have finished (i.e., -// the client has been flushed or records have been aborted). +// Err waits for all promises to complete and then returns any stored error. func (f *FirstErrPromise) Err() error { - f.mu.Lock() - defer f.mu.Unlock() + f.wg.Wait() return f.err } @@ -673,6 +690,9 @@ func (cl *Client) failUnknownTopicRecords(topic string, unknown *unknownTopicPro // lingers if necessary. // // If the context finishes (Done), this returns the context's error. +// +// This function is safe to call multiple times concurrently, and safe to call +// concurrent with Flush. func (cl *Client) Flush(ctx context.Context) error { p := &cl.producer diff --git a/pkg/kgo/txn.go b/pkg/kgo/txn.go index 5a6b9267..7abd727c 100644 --- a/pkg/kgo/txn.go +++ b/pkg/kgo/txn.go @@ -179,6 +179,10 @@ func (s *GroupTransactSession) failed() bool { // successful. If commit is false, the group has rebalanced, or any partition // in committing offsets fails, this aborts. // +// This function calls Flush or AbortBufferedRecords depending on the commit +// status. If you are flushing, it is strongly recommended to Flush yourself +// before calling this, so that you can then determine if you need to abort. +// // This returns whether the transaction committed or any error that occurred. // No returned error is retriable. Either the transactional ID has entered a // failed state, or the client retried so much that the retry limit was hit, @@ -357,10 +361,7 @@ func (cl *Client) BeginTransaction() error { } // AbortBufferedRecords fails all unflushed records with ErrAborted and waits -// for there to be no buffered records. It is likely necessary to call -// ResetProducerID after this function; these two functions should only be -// called when not concurrently producing and only if you know what you are -// doing. +// for there to be no buffered records. // // This accepts a context to quit the wait early, but it is strongly // recommended to always wait for all records to be flushed. Waits should not @@ -368,62 +369,26 @@ func (cl *Client) BeginTransaction() error { // is canceled while flushing. // // The intent of this function is to provide a way to clear the client's -// production backlog. -// -// For example, before aborting a transaction and beginning a new one, it would -// be erroneous to not wait for the backlog to clear before beginning a new -// transaction. Anything not cleared may be a part of the new transaction. +// production backlog. For example, before aborting a transaction and +// beginning a new one, it would be erroneous to not wait for the backlog to +// clear before beginning a new transaction. Anything not cleared may be a part +// of the new transaction. // // Records produced during or after a call to this function may not be failed, // thus it is incorrect to concurrently produce with this function. +// +// This function is safe to call multiple times concurrently, and safe to call +// concurrent with Flush. func (cl *Client) AbortBufferedRecords(ctx context.Context) error { - p := &cl.producer - - atomic.StoreUint32(&p.aborting, 1) - defer atomic.StoreUint32(&p.aborting, 0) - atomic.AddInt32(&p.flushing, 1) // disallow lingering to start - defer atomic.AddInt32(&p.flushing, -1) - // At this point, all drain loops that start will immediately stop, - // thus they will not begin any AddPartitionsToTxn request. We must - // now wait for any req currently built to be done being issued. + atomic.AddInt32(&cl.producer.aborting, 1) + defer atomic.AddInt32(&cl.producer.aborting, -1) - cl.cfg.logger.Log(LogLevelInfo, "aborting buffered records") + cl.cfg.logger.Log(LogLevelInfo, "producer state set to aborting; continuing to wait via flushing") defer cl.cfg.logger.Log(LogLevelDebug, "aborted buffered records") - // Similar to flushing, we unlinger; nothing will start a linger because - // the flushing atomic is non-zero. - if cl.cfg.linger > 0 || cl.cfg.manualFlushing { - for _, parts := range p.topics.load() { - for _, part := range parts.load().partitions { - part.records.unlingerAndManuallyDrain() - } - } - } - - // We have to wait for all buffered records to either be flushed - // or to safely abort themselves. - quit := false - done := make(chan struct{}) - go func() { - p.notifyMu.Lock() - defer p.notifyMu.Unlock() - defer close(done) - - for !quit && atomic.LoadInt64(&p.bufferedRecords) > 0 { - p.notifyCond.Wait() - } - }() - - select { - case <-done: - return nil - case <-ctx.Done(): - p.notifyMu.Lock() - quit = true - p.notifyMu.Unlock() - p.notifyCond.Broadcast() - return ctx.Err() - } + // Setting the aborting state allows records to fail before + // or after produce requests; thus, now we just flush. + return cl.Flush(ctx) } // EndTransaction ends a transaction and resets the client's internal state to