Skip to content

Commit

Permalink
Merge pull request #2256 from Stephan14/retry_get_metadata
Browse files Browse the repository at this point in the history
fix: fix metadata retry backoff invalid when get metadata failed
  • Loading branch information
dnwe authored Jun 20, 2022
2 parents 23c4286 + efa83c4 commit 7a92423
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 1 deletion.
17 changes: 16 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"math/rand"
"sort"
"sync"
"sync/atomic"
"time"
)

Expand Down Expand Up @@ -133,6 +134,8 @@ type client struct {
cachedPartitionsResults map[string][maxPartitionIndex][]int32

lock sync.RWMutex // protects access to the maps that hold cluster state.

updateMetaDataMs int64 //store update metadata time
}

// NewClient creates a new Client. It connects to one of the given broker addresses
Expand Down Expand Up @@ -877,10 +880,16 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int,
Logger.Println("client/metadata skipping last retries as we would go past the metadata timeout")
return err
}
Logger.Printf("client/metadata retrying after %dms... (%d attempts remaining)\n", backoff/time.Millisecond, attemptsRemaining)
if backoff > 0 {
time.Sleep(backoff)
}

t := atomic.LoadInt64(&client.updateMetaDataMs)
if time.Since(time.Unix(t/1e3, 0)) < backoff {
return err
}
Logger.Printf("client/metadata retrying after %dms... (%d attempts remaining)\n", backoff/time.Millisecond, attemptsRemaining)

return client.tryRefreshMetadata(topics, attemptsRemaining-1, deadline)
}
return err
Expand All @@ -903,6 +912,12 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int,
} else if client.conf.Version.IsAtLeast(V0_10_0_0) {
req.Version = 1
}

t := atomic.LoadInt64(&client.updateMetaDataMs)
if !atomic.CompareAndSwapInt64(&client.updateMetaDataMs, t, time.Now().UnixNano()/int64(time.Millisecond)) {
return nil
}

response, err := broker.GetMetadata(req)
var kerror KError
var packetEncodingError PacketEncodingError
Expand Down
38 changes: 38 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -813,6 +813,44 @@ func TestClientMetadataTimeout(t *testing.T) {
}
}

func TestClientUpdateMetadataErrorAndRetry(t *testing.T) {
seedBroker := NewMockBroker(t, 1)

metadataResponse1 := new(MetadataResponse)
metadataResponse1.AddBroker(seedBroker.Addr(), 1)
seedBroker.Returns(metadataResponse1)

config := NewTestConfig()
config.Metadata.Retry.Max = 3
config.Metadata.Retry.Backoff = 200 * time.Millisecond
config.Metadata.RefreshFrequency = 0
config.Net.ReadTimeout = 10 * time.Millisecond
config.Net.WriteTimeout = 10 * time.Millisecond
client, err := NewClient([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}
waitGroup := sync.WaitGroup{}
waitGroup.Add(10)
for i := 0; i < 10; i++ {
go func() {
defer waitGroup.Done()
var failedMetadataResponse MetadataResponse
failedMetadataResponse.AddBroker(seedBroker.Addr(), 1)
failedMetadataResponse.AddTopic("new_topic", ErrUnknownTopicOrPartition)
seedBroker.Returns(&failedMetadataResponse)
err := client.RefreshMetadata()
if err == nil {
t.Error("should return error")
return
}
}()
}
waitGroup.Wait()
safeClose(t, client)
seedBroker.Close()
}

func TestClientCoordinatorWithConsumerOffsetsTopic(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
staleCoordinator := NewMockBroker(t, 2)
Expand Down

0 comments on commit 7a92423

Please sign in to comment.