From a2340ebb21a42fbd01d695aa2df109854050670e Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Sun, 21 Jan 2024 12:36:00 -0700 Subject: [PATCH] pkg/kgo: inject fake fetches on metadata load errors A consumer should be notified when it cannot make progress due to metadata load errors. We now * unconditionally inject a fake fetch if a load error is not retryable * inject a fake fetch if the user has configured KeepRetryableFetchErrors --- pkg/kgo/consumer.go | 8 ++++---- pkg/kgo/consumer_direct_test.go | 28 ++++++++++++++++++++++++++++ pkg/kgo/metadata.go | 6 +++++- 3 files changed, 37 insertions(+), 5 deletions(-) diff --git a/pkg/kgo/consumer.go b/pkg/kgo/consumer.go index b337819c..b99a3c5c 100644 --- a/pkg/kgo/consumer.go +++ b/pkg/kgo/consumer.go @@ -354,7 +354,7 @@ func NewErrFetch(err error) Fetches { Topics: []FetchTopic{{ Topic: "", Partitions: []FetchPartition{{ - Partition: 0, + Partition: -1, Err: err, }}, }}, @@ -384,11 +384,11 @@ func (cl *Client) PollFetches(ctx context.Context) Fetches { } // PollRecords waits for records to be available, returning as soon as any -// broker returns records in a fetch. If the context is nil, this function -// will return immediately with any currently buffered records. +// broker returns records in a fetch. If the context is nil, this function will +// return immediately with any currently buffered records. // // If the client is closed, a fake fetch will be injected that has no topic, a -// partition of 0, and a partition error of ErrClientClosed. If the context is +// partition of -1, and a partition error of ErrClientClosed. If the context is // canceled, a fake fetch will be injected with ctx.Err. These injected errors // can be used to break out of a poll loop. // diff --git a/pkg/kgo/consumer_direct_test.go b/pkg/kgo/consumer_direct_test.go index e8a98204..066fc39f 100644 --- a/pkg/kgo/consumer_direct_test.go +++ b/pkg/kgo/consumer_direct_test.go @@ -9,6 +9,8 @@ import ( "sync/atomic" "testing" "time" + + "github.com/twmb/franz-go/pkg/kerr" ) // Allow adding a topic to consume after the client is initialized with nothing @@ -541,3 +543,29 @@ func TestSetOffsetsForNewTopic(t *testing.T) { cl.Close() } } + +func TestIssue648(t *testing.T) { + t.Parallel() + cl, _ := newTestClient( + MetadataMinAge(100*time.Millisecond), + ConsumeTopics("bizbazbuz"), + FetchMaxWait(time.Second), + KeepRetryableFetchErrors(), + ) + defer cl.Close() + ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second) + fs := cl.PollFetches(ctx) + cancel() + + var found bool + fs.EachError(func(_ string, _ int32, err error) { + if !errors.Is(err, kerr.UnknownTopicOrPartition) { + t.Errorf("expected ErrUnknownTopicOrPartition, got %v", err) + } else { + found = true + } + }) + if !found { + t.Errorf("did not see ErrUnknownTopicOrPartition") + } +} diff --git a/pkg/kgo/metadata.go b/pkg/kgo/metadata.go index 026fb0b1..71b7de07 100644 --- a/pkg/kgo/metadata.go +++ b/pkg/kgo/metadata.go @@ -695,6 +695,8 @@ func (cl *Client) mergeTopicPartitions( for _, topicPartition := range lv.partitions { topicPartition.records.bumpRepeatedLoadErr(lv.loadErr) } + } else if !kerr.IsRetriable(r.loadErr) || cl.cfg.keepRetryableFetchErrors { + cl.consumer.addFakeReadyForDraining(topic, -1, r.loadErr, "metadata refresh has a load error on this entire topic") } retryWhy.add(topic, -1, r.loadErr) return @@ -753,7 +755,7 @@ func (cl *Client) mergeTopicPartitions( } newTP := r.partitions[part] - // Like above for the entire topic, an individual partittion + // Like above for the entire topic, an individual partition // can have a load error. Unlike for the topic, individual // partition errors are always retryable. // @@ -765,6 +767,8 @@ func (cl *Client) mergeTopicPartitions( newTP.loadErr = err if isProduce { newTP.records.bumpRepeatedLoadErr(newTP.loadErr) + } else if !kerr.IsRetriable(newTP.loadErr) || cl.cfg.keepRetryableFetchErrors { + cl.consumer.addFakeReadyForDraining(topic, int32(part), newTP.loadErr, "metadata refresh has a load error on this partition") } retryWhy.add(topic, int32(part), newTP.loadErr) continue