diff --git a/pkg/kgo/broker.go b/pkg/kgo/broker.go index da530c27..f7dd0e77 100644 --- a/pkg/kgo/broker.go +++ b/pkg/kgo/broker.go @@ -362,7 +362,7 @@ func (b *broker) loadConnection(ctx context.Context, reqKey int16) (*brokerCxn, // connect connects to the broker's addr, returning the new connection. func (b *broker) connect(ctx context.Context) (net.Conn, error) { - conn, err := b.cl.cfg.dialFn(ctx, b.addr) + conn, err := b.cl.cfg.dialFn(ctx, "tcp", b.addr) if err != nil { if _, ok := err.(net.Error); ok { return nil, ErrNoDial diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go index 90024b7c..44107491 100644 --- a/pkg/kgo/client.go +++ b/pkg/kgo/client.go @@ -78,13 +78,6 @@ type Client struct { metadone chan struct{} } -// stddialer is the default dialer for dialing connections. -var stddialer = net.Dialer{Timeout: 10 * time.Second} - -func stddial(ctx context.Context, addr string) (net.Conn, error) { - return stddialer.DialContext(ctx, "tcp", addr) -} - // NewClient returns a new Kafka client with the given options or an error if // the options are invalid. func NewClient(opts ...Opt) (*Client, error) { diff --git a/pkg/kgo/config.go b/pkg/kgo/config.go index b0816f40..7bbd491a 100644 --- a/pkg/kgo/config.go +++ b/pkg/kgo/config.go @@ -46,7 +46,7 @@ func (consumerOpt) consumerOpt() {} type cfg struct { // ***GENERAL SECTION*** id *string - dialFn func(context.Context, string) (net.Conn, error) + dialFn func(context.Context, string, string) (net.Conn, error) connTimeoutOverhead time.Duration softwareName string // KIP-511 @@ -129,7 +129,7 @@ func defaultCfg() cfg { defaultID := "kgo" return cfg{ id: &defaultID, - dialFn: stddial, + dialFn: (&net.Dialer{Timeout: 10 * time.Second}).DialContext, softwareName: "kgo", softwareVersion: "0.1.0", @@ -260,7 +260,17 @@ func ConnTimeoutOverhead(overhead time.Duration) Opt { // that caused the dial. If the request is a client-internal request, the // context is the context on the client itself (which is canceled when the // client is closed). -func Dialer(fn func(context.Context, string) (net.Conn, error)) Opt { +// +// This function has the same signature as net.Dialer's DialContext and +// tls.Dialer's DialContext, meaning you can use this function like so: +// +// kgo.Dialer((&net.Dialer{Timeout: 10*time.Second}).DialContext) +// +// or +// +// kgo.Dialer((&tls.Dialer{...})}.DialContext) +// +func Dialer(fn func(ctx context.Context, network, host string) (net.Conn, error)) Opt { return clientOpt{func(cfg *cfg) { cfg.dialFn = fn }} }