From 37ecd2145a08d40c986ec2122892fa874852529d Mon Sep 17 00:00:00 2001 From: Ben Steadman Date: Thu, 12 Aug 2021 12:07:15 +0100 Subject: [PATCH] client: fix broker address parsing Support for both IPv4 and IPv6 broker addresses. Signed-off-by: Ben Steadman --- pkg/kgo/client.go | 78 +++++++++++++++++++++++----------- pkg/kgo/client_test.go | 95 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 149 insertions(+), 24 deletions(-) create mode 100644 pkg/kgo/client_test.go diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go index 7469df5d..34a33019 100644 --- a/pkg/kgo/client.go +++ b/pkg/kgo/client.go @@ -102,6 +102,13 @@ type sinkAndSource struct { source *source } +type hostport struct { + host string + port int32 +} + +const defaultKafkaPort = 9092 + // NewClient returns a new Kafka client with the given options or an error if // the options are invalid. Connections to brokers are lazily created only when // requests are written to them. @@ -124,34 +131,13 @@ func NewClient(opts ...Opt) (*Client, error) { return nil, err } - type hostport struct { - host string - port int32 - } seeds := make([]hostport, 0, len(cfg.seedBrokers)) for _, seedBroker := range cfg.seedBrokers { - colon := strings.LastIndexByte(seedBroker, ':') - if colon == -1 { // no port in address, cannot be ipv6, append default port - seedBroker += ":9092" - } else if colon > 0 && seedBroker[colon-1] != ']' { // ipv6 without port (since port would follow bracket) - seedBroker += ":9092" - } - - addr, port, err := net.SplitHostPort(seedBroker) + hp, err := parseBrokerAddr(seedBroker) if err != nil { - return nil, fmt.Errorf("unable to split host port in %q: %w", seedBroker, err) - } - - port32, err := strconv.ParseInt(port, 10, 32) - if err != nil { - return nil, fmt.Errorf("unable to parse port in %q: %w", seedBroker, err) - } - - if addr == "localhost" { - addr = "127.0.0.1" + return nil, err } - - seeds = append(seeds, hostport{addr, int32(port32)}) + seeds = append(seeds, hp) } ctx, cancel := context.WithCancel(context.Background()) @@ -215,6 +201,50 @@ func NewClient(opts ...Opt) (*Client, error) { return cl, nil } +// Parse broker IP/host and port from a string, using the default Kafka port if +// unspecified. Supported address formats: +// +// - IPv4 host/IP without port: "127.0.0.1", "localhost" +// - IPv4 host/IP with port: "127.0.0.1:1234", "localhost:1234" +// - IPv6 IP without port: "[2001:1000:2000::1]", "::1" +// - IPv6 IP with port: "[2001:1000:2000::1]:1234" +func parseBrokerAddr(addr string) (hostport, error) { + // Bracketed IPv6 + if strings.IndexByte(addr, '[') == 0 { + parts := strings.Split(addr[1:], "]") + if len(parts) != 2 { + return hostport{}, fmt.Errorf("invalid addr: %s", addr) + } + // No port specified -> use default + if len(parts[1]) == 0 { + return hostport{parts[0], defaultKafkaPort}, nil + } + port, err := strconv.ParseInt(parts[1][1:], 10, 32) + if err != nil { + return hostport{}, fmt.Errorf("unable to parse port from addr: %w", err) + } + return hostport{parts[0], int32(port)}, nil + } + + // IPv4 with no port + if strings.IndexByte(addr, ':') == -1 { + return hostport{addr, defaultKafkaPort}, nil + } + + // Either a IPv6 literal ("::1"), IP:port or host:port + // Try to parse as IP:port or host:port + h, p, err := net.SplitHostPort(addr) + if err != nil { + // IPV6 literal - use default Kafka port + return hostport{addr, defaultKafkaPort}, nil + } + port, err := strconv.ParseInt(p, 10, 32) + if err != nil { + return hostport{}, fmt.Errorf("unable to parse port from addr: %w", err) + } + return hostport{h, int32(port)}, nil +} + func connTimeoutBuilder(def time.Duration) func(kmsg.Request) (time.Duration, time.Duration) { var joinMu sync.Mutex var lastRebalanceTimeout time.Duration diff --git a/pkg/kgo/client_test.go b/pkg/kgo/client_test.go new file mode 100644 index 00000000..c1c43516 --- /dev/null +++ b/pkg/kgo/client_test.go @@ -0,0 +1,95 @@ +package kgo + +import ( + "testing" +) + +func TestParseBrokerAddr(t *testing.T) { + tests := []struct { + name string + addr string + expected hostport + }{ + { + "IPv4", + "127.0.0.1:1234", + hostport{"127.0.0.1", 1234}, + }, + { + "IPv4 + default port", + "127.0.0.1", + hostport{"127.0.0.1", 9092}, + }, + { + "host", + "localhost:1234", + hostport{"localhost", 1234}, + }, + { + "host + default port", + "localhost", + hostport{"localhost", 9092}, + }, + { + "IPv6", + "[2001:1000:2000::1]:1234", + hostport{"2001:1000:2000::1", 1234}, + }, + { + "IPv6 + default port", + "[2001:1000:2000::1]", + hostport{"2001:1000:2000::1", 9092}, + }, + { + "IPv6 literal", + "::1", + hostport{"::1", 9092}, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + result, err := parseBrokerAddr(test.addr) + if err != nil { + t.Fatal(err) + } + if result != test.expected { + t.Fatalf("expected %v, got %v", test.expected, result) + } + }) + } +} + +func TestParseBrokerAddrErrors(t *testing.T) { + tests := []struct { + name string + addr string + }{ + { + "IPv4 invalid port", + "127.0.0.1:foo", + }, + { + "host invalid port", + "localhost:foo", + }, + + { + "IPv6 invalid port", + "[2001:1000:2000::1]:foo", + }, + { + "IPv6 missing closing bracket", + "[2001:1000:2000::1:1234", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + _, err := parseBrokerAddr(test.addr) + if err == nil { + t.Fatal("expected error") + } + }) + } +}