diff --git a/pkg/kgo/config.go b/pkg/kgo/config.go index ea7f2c30..97e76b93 100644 --- a/pkg/kgo/config.go +++ b/pkg/kgo/config.go @@ -109,7 +109,7 @@ type cfg struct { maxRecordBatchBytes int32 maxBufferedRecords int64 produceTimeout time.Duration - produceRetries int64 + recordRetries int64 linger time.Duration recordTimeout time.Duration manualFlushing bool @@ -414,7 +414,7 @@ func defaultCfg() cfg { maxRecordBatchBytes: 1000000, // Kafka max.message.bytes default is 1000012 maxBufferedRecords: math.MaxInt64, produceTimeout: 30 * time.Second, - produceRetries: math.MaxInt64, // effectively unbounded + recordRetries: math.MaxInt64, // effectively unbounded partitioner: StickyKeyPartitioner(nil), // default to how Kafka partitions ////////////// @@ -586,7 +586,7 @@ func RetryBackoffFn(backoff func(int) time.Duration) Opt { // overriding the default of 20. // // This option does not apply to produce requests; to limit produce request -// retries, see ProduceRetries. +// retries / record retries, see RecordRetries. func RequestRetries(n int) Opt { return clientOpt{func(cfg *cfg) { cfg.retries = int64(n) }} } @@ -809,8 +809,8 @@ func ProduceRequestTimeout(limit time.Duration) ProducerOpt { return producerOpt{func(cfg *cfg) { cfg.produceTimeout = limit }} } -// ProduceRetries sets the number of tries for producing records, overriding -// the unlimited default. +// RecordRetries sets the number of tries for producing records, overriding the +// unlimited default. // // If idempotency is enabled (as it is by default), this option is only // enforced if it is safe to do so without creating invalid sequence numbers. @@ -819,8 +819,8 @@ func ProduceRequestTimeout(limit time.Duration) ProducerOpt { // // This option is different from RequestRetries to allow finer grained control // of when to fail when producing records. -func ProduceRetries(n int) ProducerOpt { - return producerOpt{func(cfg *cfg) { cfg.produceRetries = int64(n) }} +func RecordRetries(n int) ProducerOpt { + return producerOpt{func(cfg *cfg) { cfg.recordRetries = int64(n) }} } // StopOnDataLoss sets the client to stop producing if data loss is detected, diff --git a/pkg/kgo/producer.go b/pkg/kgo/producer.go index 4cdc776b..237e7da6 100644 --- a/pkg/kgo/producer.go +++ b/pkg/kgo/producer.go @@ -695,7 +695,7 @@ func (cl *Client) waitUnknownTopic( } cl.cfg.logger.Log(LogLevelInfo, "new topic metadata wait failed, retrying wait", "topic", topic, "err", retriableErr) tries++ - if int64(tries) >= cl.cfg.produceRetries { + if int64(tries) >= cl.cfg.recordRetries { err = fmt.Errorf("no partitions available after attempting to refresh metadata %d times, last err: %w", tries, retriableErr) } } diff --git a/pkg/kgo/sink.go b/pkg/kgo/sink.go index 43eeeb88..5f53dac7 100644 --- a/pkg/kgo/sink.go +++ b/pkg/kgo/sink.go @@ -712,7 +712,7 @@ func (s *sink) handleReqRespBatch( switch { case kerr.IsRetriable(err) && err != kerr.CorruptMessage && - batch.tries < s.cl.cfg.produceRetries: + batch.tries < s.cl.cfg.recordRetries: if debug { fmt.Fprintf(b, "retrying@%d,%d(%s)}, ", baseOffset, nrec, err) @@ -824,7 +824,7 @@ func (s *sink) handleReqRespBatch( "partition", partition, "err", err, "err_is_retriable", kerr.IsRetriable(err), - "max_retries_reached", batch.tries >= s.cl.cfg.produceRetries, + "max_retries_reached", batch.tries >= s.cl.cfg.recordRetries, ) } s.cl.finishBatch(batch.recBatch, producerID, producerEpoch, partition, baseOffset, err) @@ -1274,7 +1274,7 @@ func (b *recBatch) maybeFailErr(cfg *cfg) error { } if b.isTimedOut(cfg.recordTimeout) { return errRecordTimeout - } else if b.tries >= cfg.produceRetries { + } else if b.tries >= cfg.recordRetries { return errRecordRetries } else if b.owner.cl.producer.isAborting() { return ErrAborting