From 17567b0a8666ad1a5814c0665a1fc979332cb1fc Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Tue, 7 Feb 2023 21:50:48 -0700 Subject: [PATCH] kgo: always strip retryable errors when consuming UnknownTopicID is now returned frequently on Kafka 3.4, and it looks due to topic state not being synced immediately on all brokers. This was causing TestIssue337 to fail due to an unexpected error in the fetch responses. Now, we just strip all retryable errors. We also improve the test a bit. --- pkg/kgo/consumer_direct_test.go | 33 +++++++++++++++++++++++---------- pkg/kgo/source.go | 29 ++++++++++++++--------------- 2 files changed, 37 insertions(+), 25 deletions(-) diff --git a/pkg/kgo/consumer_direct_test.go b/pkg/kgo/consumer_direct_test.go index abd4d9b5..53a81673 100644 --- a/pkg/kgo/consumer_direct_test.go +++ b/pkg/kgo/consumer_direct_test.go @@ -7,6 +7,8 @@ import ( ) func TestIssue325(t *testing.T) { + t.Parallel() + topic, cleanup := tmpTopic(t) defer cleanup() @@ -28,6 +30,8 @@ func TestIssue325(t *testing.T) { } func TestIssue337(t *testing.T) { + t.Parallel() + topic, cleanup := tmpTopicPartitions(t, 2) defer cleanup() @@ -49,21 +53,30 @@ func TestIssue337(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() + var recs []*Record +out: for { fs := cl.PollFetches(ctx) - if err := fs.Err0(); err == context.DeadlineExceeded { - break - } - recs := fs.Records() - if len(recs) != 1 { - t.Fatalf("incorrect number of records, saw: %v", len(recs)) - } else if string(recs[0].Value) != "foo" { - t.Fatalf("wrong value, got: %s", recs[0].Value) + switch err := fs.Err0(); err { + default: + t.Fatalf("unexpected error: %v", err) + case context.DeadlineExceeded: + break out + case nil: } + recs = append(recs, fs.Records()...) + } + if len(recs) != 1 { + t.Fatalf("incorrect number of records, saw: %v", len(recs)) + } + if string(recs[0].Value) != "foo" { + t.Fatalf("wrong value, got: %s", recs[0].Value) } } func TestDirectPartitionPurge(t *testing.T) { + t.Parallel() + topic, cleanup := tmpTopicPartitions(t, 2) defer cleanup() @@ -84,7 +97,7 @@ func TestDirectPartitionPurge(t *testing.T) { } cl.PurgeTopicsFromClient(topic) - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) fs := cl.PollFetches(ctx) cancel() if err := fs.Err0(); err != context.DeadlineExceeded { @@ -92,7 +105,7 @@ func TestDirectPartitionPurge(t *testing.T) { } cl.AddConsumeTopics(topic) - ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second) + ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) defer cancel() exp := map[string]bool{ diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index c6418a25..febb23ff 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -839,21 +839,20 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe var keep bool switch fp.Err { default: - // - bad auth - // - unsupported compression - // - unsupported message version - // - unknown error - // - or, no error - keep = true - - case kerr.UnknownTopicOrPartition, - kerr.NotLeaderForPartition, - kerr.ReplicaNotAvailable, - kerr.KafkaStorageError, - kerr.UnknownLeaderEpoch, // our meta is newer than broker we fetched from - kerr.OffsetNotAvailable: // fetched from out of sync replica or a behind in-sync one (KIP-392: case 1 and case 2) - - numErrsStripped++ + if kerr.IsRetriable(fp.Err) { + // UnknownLeaderEpoch: our meta is newer than the broker we fetched from + // OffsetNotAvailable: fetched from out of sync replica or a behind in-sync one (KIP-392 case 1 and case 2) + // UnknownTopicID: kafka has not synced the state on all brokers + // And other standard retryable errors. + numErrsStripped++ + } else { + // - bad auth + // - unsupported compression + // - unsupported message version + // - unknown error + // - or, no error + keep = true + } case kerr.OffsetOutOfRange: // If we are out of range, we reset to what we can.