diff --git a/client/options.go b/client/options.go index fcc395add7..dcb05dd14d 100644 --- a/client/options.go +++ b/client/options.go @@ -27,6 +27,8 @@ var ( DefaultPoolSize = 100 // DefaultPoolTTL sets the connection pool ttl. DefaultPoolTTL = time.Minute + // DefaultPoolCloseTimeout sets the connection pool colse timeout. + DefaultPoolCloseTimeout = time.Second ) // Options are the Client options. @@ -63,8 +65,9 @@ type Options struct { Wrappers []Wrapper // Connection Pool - PoolSize int - PoolTTL time.Duration + PoolSize int + PoolTTL time.Duration + PoolCloseTimeout time.Duration } // CallOptions are options used to make calls to a server. @@ -140,13 +143,14 @@ func NewOptions(options ...Option) Options { ConnectionTimeout: DefaultConnectionTimeout, DialTimeout: transport.DefaultDialTimeout, }, - PoolSize: DefaultPoolSize, - PoolTTL: DefaultPoolTTL, - Broker: broker.DefaultBroker, - Selector: selector.DefaultSelector, - Registry: registry.DefaultRegistry, - Transport: transport.DefaultTransport, - Logger: logger.DefaultLogger, + PoolSize: DefaultPoolSize, + PoolTTL: DefaultPoolTTL, + PoolCloseTimeout: DefaultPoolCloseTimeout, + Broker: broker.DefaultBroker, + Selector: selector.DefaultSelector, + Registry: registry.DefaultRegistry, + Transport: transport.DefaultTransport, + Logger: logger.DefaultLogger, } for _, o := range options { @@ -191,6 +195,13 @@ func PoolTTL(d time.Duration) Option { } } +// PoolCloseTimeout sets the connection pool close timeout. +func PoolCloseTimeout(d time.Duration) Option { + return func(o *Options) { + o.PoolCloseTimeout = d + } +} + // Registry to find nodes for a given service. func Registry(r registry.Registry) Option { return func(o *Options) { diff --git a/client/rpc_client.go b/client/rpc_client.go index f7a680e9f2..c1f793ea05 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -46,6 +46,7 @@ func newRPCClient(opt ...Option) Client { pool.Size(opts.PoolSize), pool.TTL(opts.PoolTTL), pool.Transport(opts.Transport), + pool.CloseTimeout(opts.PoolCloseTimeout), ) rc := &rpcClient{ @@ -148,7 +149,10 @@ func (r *rpcClient) call( c, err := r.pool.Get(address, dOpts...) if err != nil { - return merrors.InternalServerError("go.micro.client", "connection error: %v", err) + if c == nil { + return merrors.InternalServerError("go.micro.client", "connection error: %v", err) + } + logger.Log(log.ErrorLevel, "failed to close pool", err) } seq := atomic.AddUint64(&r.seq, 1) - 1 diff --git a/cmd/cmd.go b/cmd/cmd.go index 6bfc86d91d..07e3490501 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -565,6 +565,14 @@ func (c *cmd) Before(ctx *cli.Context) error { clientOpts = append(clientOpts, client.PoolTTL(d)) } + if t := ctx.String("client_pool_close_timeout"); len(t) > 0 { + d, err := time.ParseDuration(t) + if err != nil { + return fmt.Errorf("failed to parse client_pool_close_timeout: %v", t) + } + clientOpts = append(clientOpts, client.PoolCloseTimeout(d)) + } + // We have some command line opts for the server. // Lets set it up if len(serverOpts) > 0 { diff --git a/util/pool/default.go b/util/pool/default.go index 4850bc447c..b9a967965a 100644 --- a/util/pool/default.go +++ b/util/pool/default.go @@ -1,6 +1,7 @@ package pool import ( + "errors" "sync" "time" @@ -12,37 +13,40 @@ import ( type pool struct { tr transport.Transport - conns map[string][]*poolConn - size int - ttl time.Duration - - sync.Mutex + closeTimeout time.Duration + conns map[string][]*poolConn + mu sync.Mutex + size int + ttl time.Duration } type poolConn struct { - created time.Time transport.Client - id string + + closeTimeout time.Duration + created time.Time + id string } func newPool(options Options) *pool { return &pool{ - size: options.Size, - tr: options.Transport, - ttl: options.TTL, - conns: make(map[string][]*poolConn), + size: options.Size, + tr: options.Transport, + ttl: options.TTL, + closeTimeout: options.CloseTimeout, + conns: make(map[string][]*poolConn), } } func (p *pool) Close() error { - p.Lock() - defer p.Unlock() + p.mu.Lock() + defer p.mu.Unlock() var err error for k, c := range p.conns { for _, conn := range c { - if nerr := conn.Client.Close(); nerr != nil { + if nerr := conn.close(); nerr != nil { err = nerr } } @@ -67,7 +71,7 @@ func (p *poolConn) Created() time.Time { } func (p *pool) Get(addr string, opts ...transport.DialOption) (Conn, error) { - p.Lock() + p.mu.Lock() conns := p.conns[addr] // While we have conns check age and then return one @@ -79,22 +83,30 @@ func (p *pool) Get(addr string, opts ...transport.DialOption) (Conn, error) { // If conn is old kill it and move on if d := time.Since(conn.Created()); d > p.ttl { - if err := conn.Client.Close(); err != nil { - p.Unlock() - return nil, err + if err := conn.close(); err != nil { + p.mu.Unlock() + c, errConn := p.newConn(addr, opts) + if errConn != nil { + return nil, errConn + } + return c, err } continue } // We got a good conn, lets unlock and return it - p.Unlock() + p.mu.Unlock() return conn, nil } - p.Unlock() + p.mu.Unlock() + return p.newConn(addr, opts) +} + +func (p *pool) newConn(addr string, opts []transport.DialOption) (Conn, error) { // create new conn c, err := p.tr.Dial(addr, opts...) if err != nil { @@ -102,28 +114,46 @@ func (p *pool) Get(addr string, opts ...transport.DialOption) (Conn, error) { } return &poolConn{ - Client: c, - id: uuid.New().String(), - created: time.Now(), + Client: c, + id: uuid.New().String(), + closeTimeout: p.closeTimeout, + created: time.Now(), }, nil } func (p *pool) Release(conn Conn, err error) error { // don't store the conn if it has errored if err != nil { - return conn.(*poolConn).Client.Close() + return conn.(*poolConn).close() } // otherwise put it back for reuse - p.Lock() - defer p.Unlock() + p.mu.Lock() + defer p.mu.Unlock() conns := p.conns[conn.Remote()] if len(conns) >= p.size { - return conn.(*poolConn).Client.Close() + return conn.(*poolConn).close() } p.conns[conn.Remote()] = append(conns, conn.(*poolConn)) return nil } + +func (p *poolConn) close() error { + ch := make(chan error) + go func() { + defer close(ch) + ch <- p.Client.Close() + }() + t := time.NewTimer(p.closeTimeout) + var err error + select { + case <-t.C: + err = errors.New("unable to close in time") + case err = <-ch: + t.Stop() + } + return err +} diff --git a/util/pool/default_test.go b/util/pool/default_test.go index df248130f9..baeaa4a25c 100644 --- a/util/pool/default_test.go +++ b/util/pool/default_test.go @@ -73,12 +73,12 @@ func testPool(t *testing.T, size int, ttl time.Duration) { // release the conn p.Release(c, nil) - p.Lock() + p.mu.Lock() if i := len(p.conns[l.Addr()]); i > size { - p.Unlock() + p.mu.Unlock() t.Fatalf("pool size %d is greater than expected %d", i, size) } - p.Unlock() + p.mu.Unlock() } } diff --git a/util/pool/options.go b/util/pool/options.go index 223c7f4b0d..51f147c1cc 100644 --- a/util/pool/options.go +++ b/util/pool/options.go @@ -7,9 +7,10 @@ import ( ) type Options struct { - Transport transport.Transport - TTL time.Duration - Size int + Transport transport.Transport + TTL time.Duration + CloseTimeout time.Duration + Size int } type Option func(*Options) @@ -31,3 +32,9 @@ func TTL(t time.Duration) Option { o.TTL = t } } + +func CloseTimeout(t time.Duration) Option { + return func(o *Options) { + o.CloseTimeout = t + } +}