Skip to content

Commit

Permalink
kgo.Dialer: breaking change for easier API usage
Browse files Browse the repository at this point in the history
By not including the network in the function passed to Dialer, the API
made it impossible to just pass a {net/tls}.Dialer.DialContext. Doing so
encouraged wrapper closures that would allocate a new dialer for every
dial.

The original API was chosen because I did not want to assume the network
(tcp or udp) that people wanted to use. This new change requires the
network to be tcp, but we can easily add a new option in the future,
DialerNetwork(string), that allows overriding the now-required tcp
network.

This new option makes it easier to use Dialer and matches other places
in the Go stdlib that take dialers (specifically, http.Transport's
DialContext)
  • Loading branch information
twmb committed Oct 6, 2020
1 parent 80f7c73 commit 9687df7
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 11 deletions.
2 changes: 1 addition & 1 deletion pkg/kgo/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ func (b *broker) loadConnection(ctx context.Context, reqKey int16) (*brokerCxn,

// connect connects to the broker's addr, returning the new connection.
func (b *broker) connect(ctx context.Context) (net.Conn, error) {
conn, err := b.cl.cfg.dialFn(ctx, b.addr)
conn, err := b.cl.cfg.dialFn(ctx, "tcp", b.addr)
if err != nil {
if _, ok := err.(net.Error); ok {
return nil, ErrNoDial
Expand Down
7 changes: 0 additions & 7 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,6 @@ type Client struct {
metadone chan struct{}
}

// stddialer is the default dialer for dialing connections.
var stddialer = net.Dialer{Timeout: 10 * time.Second}

func stddial(ctx context.Context, addr string) (net.Conn, error) {
return stddialer.DialContext(ctx, "tcp", addr)
}

// NewClient returns a new Kafka client with the given options or an error if
// the options are invalid.
func NewClient(opts ...Opt) (*Client, error) {
Expand Down
16 changes: 13 additions & 3 deletions pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (consumerOpt) consumerOpt() {}
type cfg struct {
// ***GENERAL SECTION***
id *string
dialFn func(context.Context, string) (net.Conn, error)
dialFn func(context.Context, string, string) (net.Conn, error)
connTimeoutOverhead time.Duration

softwareName string // KIP-511
Expand Down Expand Up @@ -129,7 +129,7 @@ func defaultCfg() cfg {
defaultID := "kgo"
return cfg{
id: &defaultID,
dialFn: stddial,
dialFn: (&net.Dialer{Timeout: 10 * time.Second}).DialContext,

softwareName: "kgo",
softwareVersion: "0.1.0",
Expand Down Expand Up @@ -260,7 +260,17 @@ func ConnTimeoutOverhead(overhead time.Duration) Opt {
// that caused the dial. If the request is a client-internal request, the
// context is the context on the client itself (which is canceled when the
// client is closed).
func Dialer(fn func(context.Context, string) (net.Conn, error)) Opt {
//
// This function has the same signature as net.Dialer's DialContext and
// tls.Dialer's DialContext, meaning you can use this function like so:
//
// kgo.Dialer((&net.Dialer{Timeout: 10*time.Second}).DialContext)
//
// or
//
// kgo.Dialer((&tls.Dialer{...})}.DialContext)
//
func Dialer(fn func(ctx context.Context, network, host string) (net.Conn, error)) Opt {
return clientOpt{func(cfg *cfg) { cfg.dialFn = fn }}
}

Expand Down

0 comments on commit 9687df7

Please sign in to comment.