Skip to content

Commit

Permalink
kgo.Client: add UpdateSeedBrokers(...string) error
Browse files Browse the repository at this point in the history
This allows an end user to update seed brokers, which can be valuable in
long lived clients if the original set of seeds has completely rotated
and no longer exist.

This satisfies KIP-899, allowing the client to "rebootstrap".

Closes #316.
  • Loading branch information
twmb committed Feb 8, 2023
1 parent 3a2dea6 commit c5f86ea
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 21 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ generation.
| [KIP-841](https://cwiki.apache.org/confluence/display/KAFKA/KIP-841%3A+Fenced+replicas+should+not+be+allowed+to+join+the+ISR+in+KRaft)`AlterPartition.TopicID` | 3.3 | Supported |
| [KIP-866](https://cwiki.apache.org/confluence/display/KAFKA/KIP-866+ZooKeeper+to+KRaft+Migration) — ZK to Raft RPC changes | 3.4 | Supported |
| [KIP-893](https://cwiki.apache.org/confluence/display/KAFKA/KIP-893%3A+The+Kafka+protocol+should+support+nullable+structs) — Nullable structs in the protocol | 3.5 | Supported |
| [KIP-899](https://cwiki.apache.org/confluence/display/KAFKA/KIP-899%3A+Allow+clients+to+rebootstrap) — Allow clients to rebootstrap | ? | Supported (`UpdateSeedBrokers`) |

Missing from above but included in librdkafka is:

Expand Down
5 changes: 3 additions & 2 deletions pkg/kgo/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,9 +559,10 @@ func (cl *Client) reapConnectionsLoop() {

func (cl *Client) reapConnections(idleTimeout time.Duration) (total int) {
cl.brokersMu.Lock()
brokers := make([]*broker, 0, len(cl.brokers)+len(cl.seeds))
seeds := cl.loadSeeds()
brokers := make([]*broker, 0, len(cl.brokers)+len(seeds))
brokers = append(brokers, cl.brokers...)
brokers = append(brokers, cl.seeds...)
brokers = append(brokers, seeds...)
cl.brokersMu.Unlock()

for _, broker := range brokers {
Expand Down
87 changes: 69 additions & 18 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/twmb/franz-go/pkg/kerr"
Expand All @@ -39,8 +40,8 @@ type Client struct {
rng func() float64

brokersMu sync.RWMutex
brokers []*broker // ordered by broker ID
seeds []*broker // seed brokers, also ordered by ID
brokers []*broker // ordered by broker ID
seeds atomic.Value // []*broker, seed brokers, also ordered by ID
anyBrokerIdx int32
anySeedIdx int32
stopBrokers bool // set to true on close to stop updateBrokers
Expand Down Expand Up @@ -118,6 +119,18 @@ func ValidateOpts(opts ...Opt) error {
return err
}

func parseSeeds(addrs []string) ([]hostport, error) {
seeds := make([]hostport, 0, len(addrs))
for _, seedBroker := range addrs {
hp, err := parseBrokerAddr(seedBroker)
if err != nil {
return nil, err
}
seeds = append(seeds, hp)
}
return seeds, nil
}

// This function validates the configuration and returns a few things that we
// initialize while validating. The difference between this and NewClient
// initialization is all NewClient initialization is infallible.
Expand All @@ -129,13 +142,9 @@ func validateCfg(opts ...Opt) (cfg, []hostport, *compressor, error) {
if err := cfg.validate(); err != nil {
return cfg, nil, nil, err
}
seeds := make([]hostport, 0, len(cfg.seedBrokers))
for _, seedBroker := range cfg.seedBrokers {
hp, err := parseBrokerAddr(seedBroker)
if err != nil {
return cfg, nil, nil, err
}
seeds = append(seeds, hp)
seeds, err := parseSeeds(cfg.seedBrokers)
if err != nil {
return cfg, nil, nil, err
}
compressor, err := newCompressor(cfg.compression...)
if err != nil {
Expand Down Expand Up @@ -249,17 +258,22 @@ func NewClient(opts ...Opt) (*Client, error) {
cl.reqFormatter = kmsg.NewRequestFormatter(kmsg.FormatterClientID(*cfg.id))
}

seedBrokers := make([]*broker, 0, len(seeds))
for i, seed := range seeds {
b := cl.newBroker(unknownSeedID(i), seed.host, seed.port, nil)
cl.seeds = append(cl.seeds, b)
seedBrokers = append(seedBrokers, b)
}
sort.Slice(cl.seeds, func(i, j int) bool { return cl.seeds[i].meta.NodeID < cl.seeds[j].meta.NodeID })
cl.seeds.Store(seedBrokers)
go cl.updateMetadataLoop()
go cl.reapConnectionsLoop()

return cl, nil
}

func (cl *Client) loadSeeds() []*broker {
return cl.seeds.Load().([]*broker)
}

// Ping returns whether any broker is reachable, iterating over any discovered
// broker or seed broker until one returns a successful response to an
// ApiVersions request. No discovered broker nor seed broker is attempted more
Expand All @@ -276,7 +290,7 @@ func (cl *Client) Ping(ctx context.Context) error {
var lastErr error
for _, brs := range [2][]*broker{
brokers,
cl.seeds,
cl.loadSeeds(),
} {
for _, br := range brs {
_, err := br.waitResp(ctx, req)
Expand Down Expand Up @@ -445,8 +459,9 @@ func (cl *Client) broker() *broker {
b = cl.brokers[cl.anyBrokerIdx]
cl.anyBrokerIdx++
} else {
cl.anySeedIdx %= int32(len(cl.seeds))
b = cl.seeds[cl.anySeedIdx]
seeds := cl.loadSeeds()
cl.anySeedIdx %= int32(len(seeds))
b = seeds[cl.anySeedIdx]
cl.anySeedIdx++

// If we have brokers, we ranged past discovered brokers.
Expand Down Expand Up @@ -508,7 +523,7 @@ func (cl *Client) supportsKeyVersion(key, version int16) bool {

for _, brokers := range [][]*broker{
cl.brokers,
cl.seeds,
cl.loadSeeds(),
} {
for _, b := range brokers {
if v := b.loadVersions(); v != nil && v.versions[key] >= version {
Expand Down Expand Up @@ -686,7 +701,7 @@ func (cl *Client) Close() {
broker.stopForever()
}
cl.brokersMu.Unlock()
for _, broker := range cl.seeds {
for _, broker := range cl.loadSeeds() {
broker.stopForever()
}

Expand Down Expand Up @@ -1063,7 +1078,7 @@ func (cl *Client) brokerOrErr(ctx context.Context, id int32, err error) (*broker
start:
var broker *broker
if id < 0 {
broker = findBroker(cl.seeds, id)
broker = findBroker(cl.loadSeeds(), id)
} else {
cl.brokersMu.RLock()
broker = findBroker(cl.brokers, id)
Expand Down Expand Up @@ -1569,12 +1584,48 @@ func (cl *Client) DiscoveredBrokers() []*Broker {
// SeedBrokers returns the all seed brokers.
func (cl *Client) SeedBrokers() []*Broker {
var bs []*Broker
for _, broker := range cl.seeds {
for _, broker := range cl.loadSeeds() {
bs = append(bs, &Broker{id: broker.meta.NodeID, cl: cl})
}
return bs
}

// UpdateSeedBrokers updates the client's list of seed brokers. Over the course
// of a long period of time, your might replace all brokers that you originally
// specified as seeds. This command allows you to replace the client's list of
// seeds.
//
// This returns an error if any of the input addrs is not a host:port. If the
// input list is empty, the function returns without replacing the seeds.
func (cl *Client) UpdateSeedBrokers(addrs ...string) error {
if len(addrs) == 0 {
return nil
}
seeds, err := parseSeeds(addrs)
if err != nil {
return err
}

seedBrokers := make([]*broker, 0, len(seeds))
for i, seed := range seeds {
b := cl.newBroker(unknownSeedID(i), seed.host, seed.port, nil)
seedBrokers = append(seedBrokers, b)
}

// We lock to guard against concurrently updating seeds; we do not need
// the lock for what this usually guards.
cl.brokersMu.Lock()
old := cl.loadSeeds()
cl.seeds.Store(seedBrokers)
cl.brokersMu.Unlock()

for _, b := range old {
b.stopForever()
}

return nil
}

// Broker pairs a broker ID with a client to directly issue requests to a
// specific broker.
type Broker struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1703,7 +1703,7 @@ func (s *consumerSession) mapLoadsToBrokers(loads listOrEpochLoads) map[*broker]
defer s.c.cl.brokersMu.RUnlock()

brokers := s.c.cl.brokers
seed := s.c.cl.seeds[0]
seed := s.c.cl.loadSeeds()[0]

topics := s.tps.load()
for _, loads := range []struct {
Expand Down

0 comments on commit c5f86ea

Please sign in to comment.