diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go index 304529c4..f584f39b 100644 --- a/pkg/kgo/client.go +++ b/pkg/kgo/client.go @@ -350,6 +350,8 @@ func (cl *Client) OptValues(opt any) []any { return []any{cfg.maxConcurrentFetches} case namefn(Rack): return []any{cfg.rack} + case namefn(KeepRetryableFetchErrors): + return []any{cfg.keepRetryableFetchErrors} case namefn(AdjustFetchOffsetsFn): return []any{cfg.adjustOffsetsBeforeAssign} diff --git a/pkg/kgo/config.go b/pkg/kgo/config.go index c83ea357..011d347a 100644 --- a/pkg/kgo/config.go +++ b/pkg/kgo/config.go @@ -143,7 +143,7 @@ type cfg struct { maxConcurrentFetches int disableFetchSessions bool - keepFetchRetryableErrors bool + keepRetryableFetchErrors bool topics map[string]*regexp.Regexp // topics to consume; if regex is true, values are compiled regular expressions partitions map[string]map[int32]Offset // partitions to directly consume from @@ -1349,7 +1349,7 @@ func ConsumePreferringLagFn(fn PreferLagFn) ConsumerOpt { return consumerOpt{func(cfg *cfg) { cfg.preferLagFn = fn }} } -// KeepFetchRetryableErrors switches the client to always return any retryable +// KeepRetryableFetchErrors switches the client to always return any retryable // broker error when fetching, rather than stripping them. By default, the // client strips retryable errors from fetch responses; these are usually // signals that a client needs to update its metadata to learn of where a @@ -1359,10 +1359,8 @@ func ConsumePreferringLagFn(fn PreferLagFn) ConsumerOpt { // events. For example, if you want to react to you yourself deleting a topic, // you can watch for either UNKNOWN_TOPIC_OR_PARTITION or UNKNOWN_TOPIC_ID // errors being returned in fetches (and ignore the other errors). -// -// TODO not exported / usable yet -func keepFetchRetryableErrors() ConsumerOpt { - return consumerOpt{func(cfg *cfg) { cfg.keepFetchRetryableErrors = true }} +func KeepRetryableFetchErrors() ConsumerOpt { + return consumerOpt{func(cfg *cfg) { cfg.keepRetryableFetchErrors = true }} } ////////////////////////////////// diff --git a/pkg/kgo/consumer_direct_test.go b/pkg/kgo/consumer_direct_test.go index 2940b1a1..5301dc1b 100644 --- a/pkg/kgo/consumer_direct_test.go +++ b/pkg/kgo/consumer_direct_test.go @@ -158,7 +158,7 @@ func TestIssue434(t *testing.T) { ConsumeTopics(fmt.Sprintf("(%s|%s)", t1, t2)), ConsumeRegex(), FetchMaxWait(100*time.Millisecond), - keepFetchRetryableErrors(), + KeepRetryableFetchErrors(), ) defer cl.Close() diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index 12037be2..3b6f8fd3 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -861,7 +861,7 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe var keep bool switch fp.Err { default: - if kerr.IsRetriable(fp.Err) && !s.cl.cfg.keepFetchRetryableErrors { + if kerr.IsRetriable(fp.Err) && !s.cl.cfg.keepRetryableFetchErrors { // UnknownLeaderEpoch: our meta is newer than the broker we fetched from // OffsetNotAvailable: fetched from out of sync replica or a behind in-sync one (KIP-392 case 1 and case 2) // UnknownTopicID: kafka has not synced the state on all brokers @@ -896,7 +896,7 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe if fails := partOffset.from.unknownIDFails.Add(1); fails > 5 { partOffset.from.unknownIDFails.Add(-1) keep = true - } else if s.cl.cfg.keepFetchRetryableErrors { + } else if s.cl.cfg.keepRetryableFetchErrors { keep = true } else { numErrsStripped++