Skip to content

Commit

Permalink
client: fix broker address parsing
Browse files Browse the repository at this point in the history
Support for both IPv4 and IPv6 broker addresses.

Signed-off-by: Ben Steadman <[email protected]>
  • Loading branch information
SteadBytes committed Aug 12, 2021
1 parent fd889cc commit 37ecd21
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 24 deletions.
78 changes: 54 additions & 24 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@ type sinkAndSource struct {
source *source
}

type hostport struct {
host string
port int32
}

const defaultKafkaPort = 9092

// NewClient returns a new Kafka client with the given options or an error if
// the options are invalid. Connections to brokers are lazily created only when
// requests are written to them.
Expand All @@ -124,34 +131,13 @@ func NewClient(opts ...Opt) (*Client, error) {
return nil, err
}

type hostport struct {
host string
port int32
}
seeds := make([]hostport, 0, len(cfg.seedBrokers))
for _, seedBroker := range cfg.seedBrokers {
colon := strings.LastIndexByte(seedBroker, ':')
if colon == -1 { // no port in address, cannot be ipv6, append default port
seedBroker += ":9092"
} else if colon > 0 && seedBroker[colon-1] != ']' { // ipv6 without port (since port would follow bracket)
seedBroker += ":9092"
}

addr, port, err := net.SplitHostPort(seedBroker)
hp, err := parseBrokerAddr(seedBroker)
if err != nil {
return nil, fmt.Errorf("unable to split host port in %q: %w", seedBroker, err)
}

port32, err := strconv.ParseInt(port, 10, 32)
if err != nil {
return nil, fmt.Errorf("unable to parse port in %q: %w", seedBroker, err)
}

if addr == "localhost" {
addr = "127.0.0.1"
return nil, err
}

seeds = append(seeds, hostport{addr, int32(port32)})
seeds = append(seeds, hp)
}

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -215,6 +201,50 @@ func NewClient(opts ...Opt) (*Client, error) {
return cl, nil
}

// Parse broker IP/host and port from a string, using the default Kafka port if
// unspecified. Supported address formats:
//
// - IPv4 host/IP without port: "127.0.0.1", "localhost"
// - IPv4 host/IP with port: "127.0.0.1:1234", "localhost:1234"
// - IPv6 IP without port: "[2001:1000:2000::1]", "::1"
// - IPv6 IP with port: "[2001:1000:2000::1]:1234"
func parseBrokerAddr(addr string) (hostport, error) {
// Bracketed IPv6
if strings.IndexByte(addr, '[') == 0 {
parts := strings.Split(addr[1:], "]")
if len(parts) != 2 {
return hostport{}, fmt.Errorf("invalid addr: %s", addr)
}
// No port specified -> use default
if len(parts[1]) == 0 {
return hostport{parts[0], defaultKafkaPort}, nil
}
port, err := strconv.ParseInt(parts[1][1:], 10, 32)
if err != nil {
return hostport{}, fmt.Errorf("unable to parse port from addr: %w", err)
}
return hostport{parts[0], int32(port)}, nil
}

// IPv4 with no port
if strings.IndexByte(addr, ':') == -1 {
return hostport{addr, defaultKafkaPort}, nil
}

// Either a IPv6 literal ("::1"), IP:port or host:port
// Try to parse as IP:port or host:port
h, p, err := net.SplitHostPort(addr)
if err != nil {
// IPV6 literal - use default Kafka port
return hostport{addr, defaultKafkaPort}, nil
}
port, err := strconv.ParseInt(p, 10, 32)
if err != nil {
return hostport{}, fmt.Errorf("unable to parse port from addr: %w", err)
}
return hostport{h, int32(port)}, nil
}

func connTimeoutBuilder(def time.Duration) func(kmsg.Request) (time.Duration, time.Duration) {
var joinMu sync.Mutex
var lastRebalanceTimeout time.Duration
Expand Down
95 changes: 95 additions & 0 deletions pkg/kgo/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package kgo

import (
"testing"
)

func TestParseBrokerAddr(t *testing.T) {
tests := []struct {
name string
addr string
expected hostport
}{
{
"IPv4",
"127.0.0.1:1234",
hostport{"127.0.0.1", 1234},
},
{
"IPv4 + default port",
"127.0.0.1",
hostport{"127.0.0.1", 9092},
},
{
"host",
"localhost:1234",
hostport{"localhost", 1234},
},
{
"host + default port",
"localhost",
hostport{"localhost", 9092},
},
{
"IPv6",
"[2001:1000:2000::1]:1234",
hostport{"2001:1000:2000::1", 1234},
},
{
"IPv6 + default port",
"[2001:1000:2000::1]",
hostport{"2001:1000:2000::1", 9092},
},
{
"IPv6 literal",
"::1",
hostport{"::1", 9092},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
result, err := parseBrokerAddr(test.addr)
if err != nil {
t.Fatal(err)
}
if result != test.expected {
t.Fatalf("expected %v, got %v", test.expected, result)
}
})
}
}

func TestParseBrokerAddrErrors(t *testing.T) {
tests := []struct {
name string
addr string
}{
{
"IPv4 invalid port",
"127.0.0.1:foo",
},
{
"host invalid port",
"localhost:foo",
},

{
"IPv6 invalid port",
"[2001:1000:2000::1]:foo",
},
{
"IPv6 missing closing bracket",
"[2001:1000:2000::1:1234",
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
_, err := parseBrokerAddr(test.addr)
if err == nil {
t.Fatal("expected error")
}
})
}
}

0 comments on commit 37ecd21

Please sign in to comment.