Skip to content

Commit

Permalink
UnknownTopicRetries: allow -1 to disable the option
Browse files Browse the repository at this point in the history
  • Loading branch information
twmb committed Apr 30, 2022
1 parent 0fa9fb7 commit 96d647a
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 2 deletions.
2 changes: 2 additions & 0 deletions pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -964,6 +964,8 @@ func RecordRetries(n int) ProducerOpt {
// exist, and if we repeatedly see that the topic does not exist across
// multiple metadata queries (which are going to different brokers), then we
// may as well stop trying and fail the records.
//
// If this is -1, the client never fails records with this error.
func UnknownTopicRetries(n int) ProducerOpt {
return producerOpt{func(cfg *cfg) { cfg.maxUnknownFailures = int64(n) }}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kgo/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,7 +825,7 @@ func (cl *Client) waitUnknownTopic(
if int64(tries) >= cl.cfg.recordRetries {
err = fmt.Errorf("no partitions available after attempting to refresh metadata %d times, last err: %w", tries, retriableErr)
}
if errors.Is(retriableErr, kerr.UnknownTopicOrPartition) {
if cl.cfg.maxUnknownFailures >= 0 && errors.Is(retriableErr, kerr.UnknownTopicOrPartition) {
unknownTries++
if unknownTries > cl.cfg.maxUnknownFailures {
err = retriableErr
Expand Down
2 changes: 1 addition & 1 deletion pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -1214,7 +1214,7 @@ func (recBuf *recBuf) checkUnknownFailLimit(err error) bool {
} else {
recBuf.unknownFailures = 0
}
return recBuf.unknownFailures > recBuf.cl.cfg.maxUnknownFailures
return recBuf.cl.cfg.maxUnknownFailures >= 0 && recBuf.unknownFailures > recBuf.cl.cfg.maxUnknownFailures
}

// failAllRecords fails all buffered records in this recBuf.
Expand Down

0 comments on commit 96d647a

Please sign in to comment.