From e324b560415d8cfcf74cb7e7b9f8600f83c68069 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Sun, 25 Apr 2021 10:43:27 -0600 Subject: [PATCH] producing: evaluate whether a batch should fail before and after We were inconsistent on evaluating whether a batch should fail. Before writing a request, we would check if a record ctx expired. After, we would evaluate the record timeout and the produce retries. We may as well evaluate all three at once both before and after for consistency. We also now allow checking whether records should be failed before writing a request in the non-idempotent context as well. --- pkg/kgo/config.go | 5 ++--- pkg/kgo/errors.go | 2 ++ pkg/kgo/producer.go | 4 ++-- pkg/kgo/sink.go | 43 ++++++++++++++++++++++++++----------------- 4 files changed, 32 insertions(+), 22 deletions(-) diff --git a/pkg/kgo/config.go b/pkg/kgo/config.go index 3f83fffc..b02e246f 100644 --- a/pkg/kgo/config.go +++ b/pkg/kgo/config.go @@ -755,9 +755,8 @@ func ManualFlushing() ProducerOpt { // this option with lingering. In that case, simply add the linger to the // record timeout to avoid problems. // -// The timeout is only evaluated after a produce response, and only for batches -// that need to be retried. Thus, a sink backoff may delay record timeout -// slightly. As with lingering, this also should generally be a non-issue. +// The timeout is only evaluated evaluated before writing a request or after a +// produce response. Thus, a sink backoff may delay record timeout slightly. func RecordTimeout(timeout time.Duration) ProducerOpt { return producerOpt{func(cfg *cfg) { cfg.recordTimeout = timeout }} } diff --git a/pkg/kgo/errors.go b/pkg/kgo/errors.go index 37c7639b..13dae7c2 100644 --- a/pkg/kgo/errors.go +++ b/pkg/kgo/errors.go @@ -73,6 +73,8 @@ var ( // configured record timeout limit. errRecordTimeout = errors.New("records have timed out before they were able to be produced") + errRecordRetries = errors.New("record failed after being retried too many times") + errClientClosing = errors.New("client closing") ////////////// diff --git a/pkg/kgo/producer.go b/pkg/kgo/producer.go index f2c02247..8aa5ee51 100644 --- a/pkg/kgo/producer.go +++ b/pkg/kgo/producer.go @@ -102,8 +102,8 @@ func noPromise(*Record, error) {} // records (to avoid invalid sequence numbers), all buffered records for a // partition are aborted. The context checked for doneness is always the first // buffered record's context. If that record is successfully produced, the -// context will then be the next first buffered record. The context is only -// evaluated before writing a produce request. +// context will then be the next first buffered record. The context is +// evaluated before or after writing a request. // // The first buffered record for an unknown topic begins a timeout for the // configured record timeout limit; all records buffered within the wait will diff --git a/pkg/kgo/sink.go b/pkg/kgo/sink.go index 24099096..14d6ee66 100644 --- a/pkg/kgo/sink.go +++ b/pkg/kgo/sink.go @@ -3,7 +3,6 @@ package kgo import ( "bytes" "context" - "errors" "fmt" "hash/crc32" "strings" @@ -1113,8 +1112,8 @@ func (recBuf *recBuf) bumpRepeatedLoadErr(err error) { recBuf.cl.cfg.logger.Log(LogLevelWarn, "produce partition load error, unable to produce on this partition", "broker", recBuf.sink.nodeID, "topic", recBuf.topic, "partition", recBuf.partition, "err", err) batch0 := recBuf.batches[0] batch0.tries++ - if (!recBuf.cl.idempotent() || batch0.canFailFromLoadErrs) && - (batch0.isTimedOut(recBuf.cl.cfg.recordTimeout) || batch0.tries > recBuf.cl.cfg.produceRetries || !kerr.IsRetriable(err)) { + failErr := batch0.maybeFailErr(&recBuf.cl.cfg) + if (!recBuf.cl.idempotent() || batch0.canFailFromLoadErrs) && (!kerr.IsRetriable(err) || failErr != nil) { recBuf.failAllRecords(err) } } @@ -1206,6 +1205,24 @@ type recBatch struct { records []promisedNumberedRecord } +// Returns an error if the batch should fail. +func (b *recBatch) maybeFailErr(cfg *cfg) error { + if len(b.records) > 0 { + ctx := b.records[0].ctx + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + } + if b.isTimedOut(cfg.recordTimeout) { + return errRecordTimeout + } else if b.tries >= cfg.produceRetries { + return errRecordRetries + } + return nil +} + func (b *recBatch) v0wireLength() int32 { return b.v1wireLength - 8 } // no timestamp func (b *recBatch) batchLength() int32 { return b.wireLength - 4 } // no length prefix func (b *recBatch) flexibleWireLength() int32 { // uvarint length prefix @@ -1216,7 +1233,8 @@ func (b *recBatch) flexibleWireLength() int32 { // uvarint length prefix // appendRecord saves a new record to a batch. // // This is called under the owning recBuf's mu, meaning records cannot be -// concurrently modified by failing. +// concurrently modified by failing. This batch cannot actively be used +// in a request, so we do not need to worry about a concurrent read. func (b *recBatch) appendRecord(pr promisedRec, nums recordNumbers) { b.wireLength += nums.wireLength() b.v1wireLength += messageSet1Length(pr.Record) @@ -1329,13 +1347,10 @@ func (r *produceRequest) tryAddBatch(produceVersion int32, recBuf *recBuf, batch } if recBuf.batches[0] == batch { - if batch.canFailFromLoadErrs { - ctx := batch.records[0].ctx - select { - case <-ctx.Done(): - recBuf.failAllRecords(ctx.Err()) + if !r.idempotent || batch.canFailFromLoadErrs { + if err := batch.maybeFailErr(&batch.owner.cl.cfg); err != nil { + recBuf.failAllRecords(err) return false - default: } } if recBuf.needSeqReset { @@ -1399,13 +1414,7 @@ func (rbs seqRecBatches) tryResetFailingBatchesWith(cfg *cfg, canFail bool, fn f batch.owner.mu.Lock() if batch.isOwnersFirstBatch() { if canFail || cfg.disableIdempotency { - var err error - if batch.isTimedOut(cfg.recordTimeout) { - err = errRecordTimeout - } else if batch.tries >= cfg.produceRetries { - err = errors.New("record failed after being retried too many times") - } - if err != nil { + if err := batch.maybeFailErr(cfg); err != nil { batch.owner.failAllRecords(err) batch.owner.mu.Unlock() continue