diff --git a/README.md b/README.md index 45d106e2..001fb4e7 100644 --- a/README.md +++ b/README.md @@ -393,6 +393,7 @@ generation. | [KIP-841](https://cwiki.apache.org/confluence/display/KAFKA/KIP-841%3A+Fenced+replicas+should+not+be+allowed+to+join+the+ISR+in+KRaft) — `AlterPartition.TopicID` | 3.3 | Supported | | [KIP-866](https://cwiki.apache.org/confluence/display/KAFKA/KIP-866+ZooKeeper+to+KRaft+Migration) — ZK to Raft RPC changes | 3.4 | Supported | | [KIP-893](https://cwiki.apache.org/confluence/display/KAFKA/KIP-893%3A+The+Kafka+protocol+should+support+nullable+structs) — Nullable structs in the protocol | 3.5 | Supported | +| [KIP-899](https://cwiki.apache.org/confluence/display/KAFKA/KIP-899%3A+Allow+clients+to+rebootstrap) — Allow clients to rebootstrap | ? | Supported (`UpdateSeedBrokers`) | Missing from above but included in librdkafka is: diff --git a/pkg/kgo/broker.go b/pkg/kgo/broker.go index cd525078..d4f27e0c 100644 --- a/pkg/kgo/broker.go +++ b/pkg/kgo/broker.go @@ -559,9 +559,10 @@ func (cl *Client) reapConnectionsLoop() { func (cl *Client) reapConnections(idleTimeout time.Duration) (total int) { cl.brokersMu.Lock() - brokers := make([]*broker, 0, len(cl.brokers)+len(cl.seeds)) + seeds := cl.loadSeeds() + brokers := make([]*broker, 0, len(cl.brokers)+len(seeds)) brokers = append(brokers, cl.brokers...) - brokers = append(brokers, cl.seeds...) + brokers = append(brokers, seeds...) cl.brokersMu.Unlock() for _, broker := range brokers { diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go index d0d60e01..9a401ac7 100644 --- a/pkg/kgo/client.go +++ b/pkg/kgo/client.go @@ -20,6 +20,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/twmb/franz-go/pkg/kerr" @@ -39,8 +40,8 @@ type Client struct { rng func() float64 brokersMu sync.RWMutex - brokers []*broker // ordered by broker ID - seeds []*broker // seed brokers, also ordered by ID + brokers []*broker // ordered by broker ID + seeds atomic.Value // []*broker, seed brokers, also ordered by ID anyBrokerIdx int32 anySeedIdx int32 stopBrokers bool // set to true on close to stop updateBrokers @@ -118,6 +119,18 @@ func ValidateOpts(opts ...Opt) error { return err } +func parseSeeds(addrs []string) ([]hostport, error) { + seeds := make([]hostport, 0, len(addrs)) + for _, seedBroker := range addrs { + hp, err := parseBrokerAddr(seedBroker) + if err != nil { + return nil, err + } + seeds = append(seeds, hp) + } + return seeds, nil +} + // This function validates the configuration and returns a few things that we // initialize while validating. The difference between this and NewClient // initialization is all NewClient initialization is infallible. @@ -129,13 +142,9 @@ func validateCfg(opts ...Opt) (cfg, []hostport, *compressor, error) { if err := cfg.validate(); err != nil { return cfg, nil, nil, err } - seeds := make([]hostport, 0, len(cfg.seedBrokers)) - for _, seedBroker := range cfg.seedBrokers { - hp, err := parseBrokerAddr(seedBroker) - if err != nil { - return cfg, nil, nil, err - } - seeds = append(seeds, hp) + seeds, err := parseSeeds(cfg.seedBrokers) + if err != nil { + return cfg, nil, nil, err } compressor, err := newCompressor(cfg.compression...) if err != nil { @@ -249,17 +258,22 @@ func NewClient(opts ...Opt) (*Client, error) { cl.reqFormatter = kmsg.NewRequestFormatter(kmsg.FormatterClientID(*cfg.id)) } + seedBrokers := make([]*broker, 0, len(seeds)) for i, seed := range seeds { b := cl.newBroker(unknownSeedID(i), seed.host, seed.port, nil) - cl.seeds = append(cl.seeds, b) + seedBrokers = append(seedBrokers, b) } - sort.Slice(cl.seeds, func(i, j int) bool { return cl.seeds[i].meta.NodeID < cl.seeds[j].meta.NodeID }) + cl.seeds.Store(seedBrokers) go cl.updateMetadataLoop() go cl.reapConnectionsLoop() return cl, nil } +func (cl *Client) loadSeeds() []*broker { + return cl.seeds.Load().([]*broker) +} + // Ping returns whether any broker is reachable, iterating over any discovered // broker or seed broker until one returns a successful response to an // ApiVersions request. No discovered broker nor seed broker is attempted more @@ -276,7 +290,7 @@ func (cl *Client) Ping(ctx context.Context) error { var lastErr error for _, brs := range [2][]*broker{ brokers, - cl.seeds, + cl.loadSeeds(), } { for _, br := range brs { _, err := br.waitResp(ctx, req) @@ -445,8 +459,9 @@ func (cl *Client) broker() *broker { b = cl.brokers[cl.anyBrokerIdx] cl.anyBrokerIdx++ } else { - cl.anySeedIdx %= int32(len(cl.seeds)) - b = cl.seeds[cl.anySeedIdx] + seeds := cl.loadSeeds() + cl.anySeedIdx %= int32(len(seeds)) + b = seeds[cl.anySeedIdx] cl.anySeedIdx++ // If we have brokers, we ranged past discovered brokers. @@ -508,7 +523,7 @@ func (cl *Client) supportsKeyVersion(key, version int16) bool { for _, brokers := range [][]*broker{ cl.brokers, - cl.seeds, + cl.loadSeeds(), } { for _, b := range brokers { if v := b.loadVersions(); v != nil && v.versions[key] >= version { @@ -686,7 +701,7 @@ func (cl *Client) Close() { broker.stopForever() } cl.brokersMu.Unlock() - for _, broker := range cl.seeds { + for _, broker := range cl.loadSeeds() { broker.stopForever() } @@ -1063,7 +1078,7 @@ func (cl *Client) brokerOrErr(ctx context.Context, id int32, err error) (*broker start: var broker *broker if id < 0 { - broker = findBroker(cl.seeds, id) + broker = findBroker(cl.loadSeeds(), id) } else { cl.brokersMu.RLock() broker = findBroker(cl.brokers, id) @@ -1569,12 +1584,48 @@ func (cl *Client) DiscoveredBrokers() []*Broker { // SeedBrokers returns the all seed brokers. func (cl *Client) SeedBrokers() []*Broker { var bs []*Broker - for _, broker := range cl.seeds { + for _, broker := range cl.loadSeeds() { bs = append(bs, &Broker{id: broker.meta.NodeID, cl: cl}) } return bs } +// UpdateSeedBrokers updates the client's list of seed brokers. Over the course +// of a long period of time, your might replace all brokers that you originally +// specified as seeds. This command allows you to replace the client's list of +// seeds. +// +// This returns an error if any of the input addrs is not a host:port. If the +// input list is empty, the function returns without replacing the seeds. +func (cl *Client) UpdateSeedBrokers(addrs ...string) error { + if len(addrs) == 0 { + return nil + } + seeds, err := parseSeeds(addrs) + if err != nil { + return err + } + + seedBrokers := make([]*broker, 0, len(seeds)) + for i, seed := range seeds { + b := cl.newBroker(unknownSeedID(i), seed.host, seed.port, nil) + seedBrokers = append(seedBrokers, b) + } + + // We lock to guard against concurrently updating seeds; we do not need + // the lock for what this usually guards. + cl.brokersMu.Lock() + old := cl.loadSeeds() + cl.seeds.Store(seedBrokers) + cl.brokersMu.Unlock() + + for _, b := range old { + b.stopForever() + } + + return nil +} + // Broker pairs a broker ID with a client to directly issue requests to a // specific broker. type Broker struct { diff --git a/pkg/kgo/consumer.go b/pkg/kgo/consumer.go index bc9ae070..b1d0f2e5 100644 --- a/pkg/kgo/consumer.go +++ b/pkg/kgo/consumer.go @@ -1703,7 +1703,7 @@ func (s *consumerSession) mapLoadsToBrokers(loads listOrEpochLoads) map[*broker] defer s.c.cl.brokersMu.RUnlock() brokers := s.c.cl.brokers - seed := s.c.cl.seeds[0] + seed := s.c.cl.loadSeeds()[0] topics := s.tps.load() for _, loads := range []struct {