Skip to content

Commit

Permalink
breaking: rename ProduceRetries to RecordRetries
Browse files Browse the repository at this point in the history
This more accurately reflect that the retries correspond to the records
and number of errors trying to do something for producing a record. As
well, this has the benefit that now this option is next to RecordTimeout
in the documentation.
  • Loading branch information
twmb committed Aug 17, 2021
1 parent 12808d5 commit 4e5eca8
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 11 deletions.
14 changes: 7 additions & 7 deletions pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ type cfg struct {
maxRecordBatchBytes int32
maxBufferedRecords int64
produceTimeout time.Duration
produceRetries int64
recordRetries int64
linger time.Duration
recordTimeout time.Duration
manualFlushing bool
Expand Down Expand Up @@ -414,7 +414,7 @@ func defaultCfg() cfg {
maxRecordBatchBytes: 1000000, // Kafka max.message.bytes default is 1000012
maxBufferedRecords: math.MaxInt64,
produceTimeout: 30 * time.Second,
produceRetries: math.MaxInt64, // effectively unbounded
recordRetries: math.MaxInt64, // effectively unbounded
partitioner: StickyKeyPartitioner(nil), // default to how Kafka partitions

//////////////
Expand Down Expand Up @@ -586,7 +586,7 @@ func RetryBackoffFn(backoff func(int) time.Duration) Opt {
// overriding the default of 20.
//
// This option does not apply to produce requests; to limit produce request
// retries, see ProduceRetries.
// retries / record retries, see RecordRetries.
func RequestRetries(n int) Opt {
return clientOpt{func(cfg *cfg) { cfg.retries = int64(n) }}
}
Expand Down Expand Up @@ -809,8 +809,8 @@ func ProduceRequestTimeout(limit time.Duration) ProducerOpt {
return producerOpt{func(cfg *cfg) { cfg.produceTimeout = limit }}
}

// ProduceRetries sets the number of tries for producing records, overriding
// the unlimited default.
// RecordRetries sets the number of tries for producing records, overriding the
// unlimited default.
//
// If idempotency is enabled (as it is by default), this option is only
// enforced if it is safe to do so without creating invalid sequence numbers.
Expand All @@ -819,8 +819,8 @@ func ProduceRequestTimeout(limit time.Duration) ProducerOpt {
//
// This option is different from RequestRetries to allow finer grained control
// of when to fail when producing records.
func ProduceRetries(n int) ProducerOpt {
return producerOpt{func(cfg *cfg) { cfg.produceRetries = int64(n) }}
func RecordRetries(n int) ProducerOpt {
return producerOpt{func(cfg *cfg) { cfg.recordRetries = int64(n) }}
}

// StopOnDataLoss sets the client to stop producing if data loss is detected,
Expand Down
2 changes: 1 addition & 1 deletion pkg/kgo/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -695,7 +695,7 @@ func (cl *Client) waitUnknownTopic(
}
cl.cfg.logger.Log(LogLevelInfo, "new topic metadata wait failed, retrying wait", "topic", topic, "err", retriableErr)
tries++
if int64(tries) >= cl.cfg.produceRetries {
if int64(tries) >= cl.cfg.recordRetries {
err = fmt.Errorf("no partitions available after attempting to refresh metadata %d times, last err: %w", tries, retriableErr)
}
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,7 @@ func (s *sink) handleReqRespBatch(
switch {
case kerr.IsRetriable(err) &&
err != kerr.CorruptMessage &&
batch.tries < s.cl.cfg.produceRetries:
batch.tries < s.cl.cfg.recordRetries:

if debug {
fmt.Fprintf(b, "retrying@%d,%d(%s)}, ", baseOffset, nrec, err)
Expand Down Expand Up @@ -824,7 +824,7 @@ func (s *sink) handleReqRespBatch(
"partition", partition,
"err", err,
"err_is_retriable", kerr.IsRetriable(err),
"max_retries_reached", batch.tries >= s.cl.cfg.produceRetries,
"max_retries_reached", batch.tries >= s.cl.cfg.recordRetries,
)
}
s.cl.finishBatch(batch.recBatch, producerID, producerEpoch, partition, baseOffset, err)
Expand Down Expand Up @@ -1274,7 +1274,7 @@ func (b *recBatch) maybeFailErr(cfg *cfg) error {
}
if b.isTimedOut(cfg.recordTimeout) {
return errRecordTimeout
} else if b.tries >= cfg.produceRetries {
} else if b.tries >= cfg.recordRetries {
return errRecordRetries
} else if b.owner.cl.producer.isAborting() {
return ErrAborting
Expand Down

0 comments on commit 4e5eca8

Please sign in to comment.