Skip to content

Commit

Permalink
kgo: always strip retryable errors when consuming
Browse files Browse the repository at this point in the history
UnknownTopicID is now returned frequently on Kafka 3.4, and it looks due
to topic state not being synced immediately on all brokers. This was
causing TestIssue337 to fail due to an unexpected error in the fetch
responses.

Now, we just strip all retryable errors.

We also improve the test a bit.
  • Loading branch information
twmb committed Feb 8, 2023
1 parent 1fee9a9 commit 17567b0
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 25 deletions.
33 changes: 23 additions & 10 deletions pkg/kgo/consumer_direct_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
)

func TestIssue325(t *testing.T) {
t.Parallel()

topic, cleanup := tmpTopic(t)
defer cleanup()

Expand All @@ -28,6 +30,8 @@ func TestIssue325(t *testing.T) {
}

func TestIssue337(t *testing.T) {
t.Parallel()

topic, cleanup := tmpTopicPartitions(t, 2)
defer cleanup()

Expand All @@ -49,21 +53,30 @@ func TestIssue337(t *testing.T) {

ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
var recs []*Record
out:
for {
fs := cl.PollFetches(ctx)
if err := fs.Err0(); err == context.DeadlineExceeded {
break
}
recs := fs.Records()
if len(recs) != 1 {
t.Fatalf("incorrect number of records, saw: %v", len(recs))
} else if string(recs[0].Value) != "foo" {
t.Fatalf("wrong value, got: %s", recs[0].Value)
switch err := fs.Err0(); err {
default:
t.Fatalf("unexpected error: %v", err)
case context.DeadlineExceeded:
break out
case nil:
}
recs = append(recs, fs.Records()...)
}
if len(recs) != 1 {
t.Fatalf("incorrect number of records, saw: %v", len(recs))
}
if string(recs[0].Value) != "foo" {
t.Fatalf("wrong value, got: %s", recs[0].Value)
}
}

func TestDirectPartitionPurge(t *testing.T) {
t.Parallel()

topic, cleanup := tmpTopicPartitions(t, 2)
defer cleanup()

Expand All @@ -84,15 +97,15 @@ func TestDirectPartitionPurge(t *testing.T) {
}
cl.PurgeTopicsFromClient(topic)

ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
fs := cl.PollFetches(ctx)
cancel()
if err := fs.Err0(); err != context.DeadlineExceeded {
t.Fatal("unexpected success when expecting context.DeadlineExceeded")
}

cl.AddConsumeTopics(topic)
ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second)
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

exp := map[string]bool{
Expand Down
29 changes: 14 additions & 15 deletions pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -839,21 +839,20 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe
var keep bool
switch fp.Err {
default:
// - bad auth
// - unsupported compression
// - unsupported message version
// - unknown error
// - or, no error
keep = true

case kerr.UnknownTopicOrPartition,
kerr.NotLeaderForPartition,
kerr.ReplicaNotAvailable,
kerr.KafkaStorageError,
kerr.UnknownLeaderEpoch, // our meta is newer than broker we fetched from
kerr.OffsetNotAvailable: // fetched from out of sync replica or a behind in-sync one (KIP-392: case 1 and case 2)

numErrsStripped++
if kerr.IsRetriable(fp.Err) {
// UnknownLeaderEpoch: our meta is newer than the broker we fetched from
// OffsetNotAvailable: fetched from out of sync replica or a behind in-sync one (KIP-392 case 1 and case 2)
// UnknownTopicID: kafka has not synced the state on all brokers
// And other standard retryable errors.
numErrsStripped++
} else {
// - bad auth
// - unsupported compression
// - unsupported message version
// - unknown error
// - or, no error
keep = true
}

case kerr.OffsetOutOfRange:
// If we are out of range, we reset to what we can.
Expand Down

0 comments on commit 17567b0

Please sign in to comment.