From 8da4eaaa73a40695d12c971b5662b4b73d516fe4 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Sun, 1 Aug 2021 22:27:55 +0300 Subject: [PATCH] client: enable ipv6 support in seed brokers Signed-off-by: Vasiliy Tolstov --- pkg/kgo/client.go | 27 +++++++++++++++++---------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go index 0f62a29b..0fbcb873 100644 --- a/pkg/kgo/client.go +++ b/pkg/kgo/client.go @@ -23,6 +23,7 @@ import ( "fmt" "hash/crc32" "math/rand" + "net" "reflect" "sort" "strconv" @@ -129,22 +130,28 @@ func NewClient(opts ...Opt) (*Client, error) { } seeds := make([]hostport, 0, len(cfg.seedBrokers)) for _, seedBroker := range cfg.seedBrokers { - addr := seedBroker - port := int32(9092) // default kafka port - if colon := strings.IndexByte(addr, ':'); colon > 0 { - port64, err := strconv.ParseInt(addr[colon+1:], 10, 64) - if err != nil { - return nil, fmt.Errorf("unable to parse addr:port in %q", seedBroker) - } - addr = addr[:colon] - port = int32(port64) + colon := strings.LastIndexByte(seedBroker, ':') + if colon == -1 { // no port in address, cannot be ipv6, append default port + seedBroker += ":9092" + } else if colon > 0 && seedBroker[colon-1] != ']' { // ipv6 without port (since port would follow bracket) + seedBroker += ":9092" + } + + addr, port, err := net.SplitHostPort(seedBroker) + if err != nil { + return nil, fmt.Errorf("unable to split host port in %q: %w", seedBroker, err) + } + + port32, err := strconv.ParseInt(port, 10, 32) + if err != nil { + return nil, fmt.Errorf("unable to parse port in %q: %w", seedBroker, err) } if addr == "localhost" { addr = "127.0.0.1" } - seeds = append(seeds, hostport{addr, port}) + seeds = append(seeds, hostport{addr, int32(port32)}) } ctx, cancel := context.WithCancel(context.Background())