From c5f86eaa7a66153360464993bc2943dec6489515 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Wed, 8 Feb 2023 01:07:49 -0700 Subject: [PATCH] kgo.Client: add UpdateSeedBrokers(...string) error This allows an end user to update seed brokers, which can be valuable in long lived clients if the original set of seeds has completely rotated and no longer exist. This satisfies KIP-899, allowing the client to "rebootstrap". Closes #316. --- README.md | 1 + pkg/kgo/broker.go | 5 +-- pkg/kgo/client.go | 87 +++++++++++++++++++++++++++++++++++---------- pkg/kgo/consumer.go | 2 +- 4 files changed, 74 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index 45d106e2..001fb4e7 100644 --- a/README.md +++ b/README.md @@ -393,6 +393,7 @@ generation. | [KIP-841](https://cwiki.apache.org/confluence/display/KAFKA/KIP-841%3A+Fenced+replicas+should+not+be+allowed+to+join+the+ISR+in+KRaft) — `AlterPartition.TopicID` | 3.3 | Supported | | [KIP-866](https://cwiki.apache.org/confluence/display/KAFKA/KIP-866+ZooKeeper+to+KRaft+Migration) — ZK to Raft RPC changes | 3.4 | Supported | | [KIP-893](https://cwiki.apache.org/confluence/display/KAFKA/KIP-893%3A+The+Kafka+protocol+should+support+nullable+structs) — Nullable structs in the protocol | 3.5 | Supported | +| [KIP-899](https://cwiki.apache.org/confluence/display/KAFKA/KIP-899%3A+Allow+clients+to+rebootstrap) — Allow clients to rebootstrap | ? | Supported (`UpdateSeedBrokers`) | Missing from above but included in librdkafka is: diff --git a/pkg/kgo/broker.go b/pkg/kgo/broker.go index cd525078..d4f27e0c 100644 --- a/pkg/kgo/broker.go +++ b/pkg/kgo/broker.go @@ -559,9 +559,10 @@ func (cl *Client) reapConnectionsLoop() { func (cl *Client) reapConnections(idleTimeout time.Duration) (total int) { cl.brokersMu.Lock() - brokers := make([]*broker, 0, len(cl.brokers)+len(cl.seeds)) + seeds := cl.loadSeeds() + brokers := make([]*broker, 0, len(cl.brokers)+len(seeds)) brokers = append(brokers, cl.brokers...) - brokers = append(brokers, cl.seeds...) + brokers = append(brokers, seeds...) cl.brokersMu.Unlock() for _, broker := range brokers { diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go index d0d60e01..9a401ac7 100644 --- a/pkg/kgo/client.go +++ b/pkg/kgo/client.go @@ -20,6 +20,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/twmb/franz-go/pkg/kerr" @@ -39,8 +40,8 @@ type Client struct { rng func() float64 brokersMu sync.RWMutex - brokers []*broker // ordered by broker ID - seeds []*broker // seed brokers, also ordered by ID + brokers []*broker // ordered by broker ID + seeds atomic.Value // []*broker, seed brokers, also ordered by ID anyBrokerIdx int32 anySeedIdx int32 stopBrokers bool // set to true on close to stop updateBrokers @@ -118,6 +119,18 @@ func ValidateOpts(opts ...Opt) error { return err } +func parseSeeds(addrs []string) ([]hostport, error) { + seeds := make([]hostport, 0, len(addrs)) + for _, seedBroker := range addrs { + hp, err := parseBrokerAddr(seedBroker) + if err != nil { + return nil, err + } + seeds = append(seeds, hp) + } + return seeds, nil +} + // This function validates the configuration and returns a few things that we // initialize while validating. The difference between this and NewClient // initialization is all NewClient initialization is infallible. @@ -129,13 +142,9 @@ func validateCfg(opts ...Opt) (cfg, []hostport, *compressor, error) { if err := cfg.validate(); err != nil { return cfg, nil, nil, err } - seeds := make([]hostport, 0, len(cfg.seedBrokers)) - for _, seedBroker := range cfg.seedBrokers { - hp, err := parseBrokerAddr(seedBroker) - if err != nil { - return cfg, nil, nil, err - } - seeds = append(seeds, hp) + seeds, err := parseSeeds(cfg.seedBrokers) + if err != nil { + return cfg, nil, nil, err } compressor, err := newCompressor(cfg.compression...) if err != nil { @@ -249,17 +258,22 @@ func NewClient(opts ...Opt) (*Client, error) { cl.reqFormatter = kmsg.NewRequestFormatter(kmsg.FormatterClientID(*cfg.id)) } + seedBrokers := make([]*broker, 0, len(seeds)) for i, seed := range seeds { b := cl.newBroker(unknownSeedID(i), seed.host, seed.port, nil) - cl.seeds = append(cl.seeds, b) + seedBrokers = append(seedBrokers, b) } - sort.Slice(cl.seeds, func(i, j int) bool { return cl.seeds[i].meta.NodeID < cl.seeds[j].meta.NodeID }) + cl.seeds.Store(seedBrokers) go cl.updateMetadataLoop() go cl.reapConnectionsLoop() return cl, nil } +func (cl *Client) loadSeeds() []*broker { + return cl.seeds.Load().([]*broker) +} + // Ping returns whether any broker is reachable, iterating over any discovered // broker or seed broker until one returns a successful response to an // ApiVersions request. No discovered broker nor seed broker is attempted more @@ -276,7 +290,7 @@ func (cl *Client) Ping(ctx context.Context) error { var lastErr error for _, brs := range [2][]*broker{ brokers, - cl.seeds, + cl.loadSeeds(), } { for _, br := range brs { _, err := br.waitResp(ctx, req) @@ -445,8 +459,9 @@ func (cl *Client) broker() *broker { b = cl.brokers[cl.anyBrokerIdx] cl.anyBrokerIdx++ } else { - cl.anySeedIdx %= int32(len(cl.seeds)) - b = cl.seeds[cl.anySeedIdx] + seeds := cl.loadSeeds() + cl.anySeedIdx %= int32(len(seeds)) + b = seeds[cl.anySeedIdx] cl.anySeedIdx++ // If we have brokers, we ranged past discovered brokers. @@ -508,7 +523,7 @@ func (cl *Client) supportsKeyVersion(key, version int16) bool { for _, brokers := range [][]*broker{ cl.brokers, - cl.seeds, + cl.loadSeeds(), } { for _, b := range brokers { if v := b.loadVersions(); v != nil && v.versions[key] >= version { @@ -686,7 +701,7 @@ func (cl *Client) Close() { broker.stopForever() } cl.brokersMu.Unlock() - for _, broker := range cl.seeds { + for _, broker := range cl.loadSeeds() { broker.stopForever() } @@ -1063,7 +1078,7 @@ func (cl *Client) brokerOrErr(ctx context.Context, id int32, err error) (*broker start: var broker *broker if id < 0 { - broker = findBroker(cl.seeds, id) + broker = findBroker(cl.loadSeeds(), id) } else { cl.brokersMu.RLock() broker = findBroker(cl.brokers, id) @@ -1569,12 +1584,48 @@ func (cl *Client) DiscoveredBrokers() []*Broker { // SeedBrokers returns the all seed brokers. func (cl *Client) SeedBrokers() []*Broker { var bs []*Broker - for _, broker := range cl.seeds { + for _, broker := range cl.loadSeeds() { bs = append(bs, &Broker{id: broker.meta.NodeID, cl: cl}) } return bs } +// UpdateSeedBrokers updates the client's list of seed brokers. Over the course +// of a long period of time, your might replace all brokers that you originally +// specified as seeds. This command allows you to replace the client's list of +// seeds. +// +// This returns an error if any of the input addrs is not a host:port. If the +// input list is empty, the function returns without replacing the seeds. +func (cl *Client) UpdateSeedBrokers(addrs ...string) error { + if len(addrs) == 0 { + return nil + } + seeds, err := parseSeeds(addrs) + if err != nil { + return err + } + + seedBrokers := make([]*broker, 0, len(seeds)) + for i, seed := range seeds { + b := cl.newBroker(unknownSeedID(i), seed.host, seed.port, nil) + seedBrokers = append(seedBrokers, b) + } + + // We lock to guard against concurrently updating seeds; we do not need + // the lock for what this usually guards. + cl.brokersMu.Lock() + old := cl.loadSeeds() + cl.seeds.Store(seedBrokers) + cl.brokersMu.Unlock() + + for _, b := range old { + b.stopForever() + } + + return nil +} + // Broker pairs a broker ID with a client to directly issue requests to a // specific broker. type Broker struct { diff --git a/pkg/kgo/consumer.go b/pkg/kgo/consumer.go index bc9ae070..b1d0f2e5 100644 --- a/pkg/kgo/consumer.go +++ b/pkg/kgo/consumer.go @@ -1703,7 +1703,7 @@ func (s *consumerSession) mapLoadsToBrokers(loads listOrEpochLoads) map[*broker] defer s.c.cl.brokersMu.RUnlock() brokers := s.c.cl.brokers - seed := s.c.cl.seeds[0] + seed := s.c.cl.loadSeeds()[0] topics := s.tps.load() for _, loads := range []struct {