Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions go/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ const (
// ErrOptionPreventsStatement is C.ER_OPTION_PREVENTS_STATEMENT
ErrOptionPreventsStatement = C.ER_OPTION_PREVENTS_STATEMENT

// ErrServerLost is C.CR_SERVER_LOST (2013)
ErrServerLost = C.CR_SERVER_LOST
// ErrServerLost is C.CR_SERVER_LOST.
// It's hard-coded for now because it causes problems on import.
ErrServerLost = 2013

// RedactedPassword is the password value used in redacted configs
RedactedPassword = "****"
Expand Down
1 change: 0 additions & 1 deletion go/mysql/vtmysql.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
// license that can be found in the LICENSE file.

#include <mysql.h>
#include <errmsg.h>

// This API provides convenient C wrapper functions for mysql client.

Expand Down
29 changes: 23 additions & 6 deletions go/pools/resource_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,17 @@ import (
)

var (
CLOSED_ERR = errors.New("resource pool is closed")
TIMEOUT_ERR = errors.New("resource pool timed out")
// ErrClosed is returned if ResourcePool is used when it's closed.
ErrClosed = errors.New("resource pool is closed")

// ErrTimeout is returned if a resource get times out.
ErrTimeout = errors.New("resource pool timed out")
)

// Factory is a function that can be used to create a resource.
type Factory func() (Resource, error)

// Every resource needs to suport the Resource interface.
// Resource defines the interface that every resource must provide.
// Thread synchronization between Close() and IsClosed()
// is the responsibility of the caller.
type Resource interface {
Expand Down Expand Up @@ -79,6 +82,7 @@ func (rp *ResourcePool) Close() {
_ = rp.SetCapacity(0)
}

// IsClosed returns true if the resource pool is closed.
func (rp *ResourcePool) IsClosed() (closed bool) {
return rp.capacity.Get() == 0
}
Expand All @@ -99,6 +103,10 @@ func (rp *ResourcePool) TryGet() (resource Resource, err error) {
}

func (rp *ResourcePool) get(ctx context.Context, wait bool) (resource Resource, err error) {
// If ctx has already expired, avoid racing with rp's resource channel.
if ctx.Err() != nil {
return nil, ErrTimeout
}
// Fetch
var wrapper resourceWrapper
var ok bool
Expand All @@ -112,12 +120,12 @@ func (rp *ResourcePool) get(ctx context.Context, wait bool) (resource Resource,
select {
case wrapper, ok = <-rp.resources:
case <-ctx.Done():
return nil, TIMEOUT_ERR
return nil, ErrTimeout
}
rp.recordWait(startTime)
}
if !ok {
return nil, CLOSED_ERR
return nil, ErrClosed
}

// Unwrap
Expand Down Expand Up @@ -168,7 +176,7 @@ func (rp *ResourcePool) SetCapacity(capacity int) error {
for {
oldcap = int(rp.capacity.Get())
if oldcap == 0 {
return CLOSED_ERR
return ErrClosed
}
if oldcap == capacity {
return nil
Expand Down Expand Up @@ -201,39 +209,48 @@ func (rp *ResourcePool) recordWait(start time.Time) {
rp.waitTime.Add(time.Now().Sub(start))
}

// SetIdleTimeout sets the idle timeout.
func (rp *ResourcePool) SetIdleTimeout(idleTimeout time.Duration) {
rp.idleTimeout.Set(idleTimeout)
}

// StatsJSON returns the stats in JSON format.
func (rp *ResourcePool) StatsJSON() string {
c, a, mx, wc, wt, it := rp.Stats()
return fmt.Sprintf(`{"Capacity": %v, "Available": %v, "MaxCapacity": %v, "WaitCount": %v, "WaitTime": %v, "IdleTimeout": %v}`, c, a, mx, wc, int64(wt), int64(it))
}

// Stats returns the stats.
func (rp *ResourcePool) Stats() (capacity, available, maxCap, waitCount int64, waitTime, idleTimeout time.Duration) {
return rp.Capacity(), rp.Available(), rp.MaxCap(), rp.WaitCount(), rp.WaitTime(), rp.IdleTimeout()
}

// Capacity returns the capacity.
func (rp *ResourcePool) Capacity() int64 {
return rp.capacity.Get()
}

// Available returns the number of currently unused resources.
func (rp *ResourcePool) Available() int64 {
return int64(len(rp.resources))
}

// MaxCap returns the max capacity.
func (rp *ResourcePool) MaxCap() int64 {
return int64(cap(rp.resources))
}

// WaitCount returns the total number of waits.
func (rp *ResourcePool) WaitCount() int64 {
return rp.waitCount.Get()
}

// WaitTime returns the total wait time.
func (rp *ResourcePool) WaitTime() time.Duration {
return rp.waitTime.Get()
}

// IdleTimeout returns the idle timeout.
func (rp *ResourcePool) IdleTimeout() time.Duration {
return rp.idleTimeout.Get()
}
17 changes: 17 additions & 0 deletions go/pools/resource_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,3 +508,20 @@ func TestTimeout(t *testing.T) {
}
p.Put(r)
}

func TestExpired(t *testing.T) {
lastID.Set(0)
count.Set(0)
p := NewResourcePool(PoolFactory, 1, 1, time.Second)
defer p.Close()
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(-1*time.Second))
r, err := p.Get(ctx)
if err == nil {
p.Put(r)
}
cancel()
want := "resource pool timed out"
if err == nil || err.Error() != want {
t.Errorf("got %v, want %s", err, want)
}
}
8 changes: 6 additions & 2 deletions go/vt/dbconnpool/connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,12 @@ func (cp *ConnectionPool) Get(timeout time.Duration) (PoolConnection, error) {
if p == nil {
return nil, ErrConnPoolClosed
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
ctx := context.Background()
if timeout != 0 {
var cancel func()
ctx, cancel = context.WithTimeout(context.Background(), timeout)
defer cancel()
}
r, err := p.Get(ctx)
if err != nil {
return nil, err
Expand Down
10 changes: 8 additions & 2 deletions go/vt/tabletserver/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,18 @@ func (axp *TxPool) transactionKiller() {
// Begin begins a transaction, and returns the associated transaction id.
// Subsequent statements can access the connection through the transaction id.
func (axp *TxPool) Begin(ctx context.Context) int64 {
conn, err := axp.pool.Get(ctx)
poolCtx := ctx
if deadline, ok := ctx.Deadline(); ok {
var cancel func()
poolCtx, cancel = context.WithDeadline(ctx, deadline.Add(-10*time.Millisecond))
defer cancel()
}
conn, err := axp.pool.Get(poolCtx)
if err != nil {
switch err {
case ErrConnPoolClosed:
panic(connPoolClosedErr)
case pools.TIMEOUT_ERR:
case pools.ErrTimeout:
axp.LogActive()
panic(NewTabletError(ErrTxPoolFull, "Transaction pool connection limit exceeded"))
}
Expand Down
22 changes: 19 additions & 3 deletions go/vt/tabletserver/tx_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,28 @@ func TestBeginWithPoolTimeout(t *testing.T) {
// set pool capacity to 1
txPool.pool.SetCapacity(1)
defer txPool.Close()
ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(-10*time.Second))
defer handleAndVerifyTabletError(t, "expect to get an error", ErrTxPoolFull)
// start the first transaction
txPool.Begin(ctx)
txPool.Begin(context.Background())
// start the second transaction, which should fail due to
// ErrTxPoolFull error
ctx, _ := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer handleAndVerifyTabletError(t, "expect to get an error", ErrTxPoolFull)
txPool.Begin(ctx)
}

func TestBeginWithShortDeadline(t *testing.T) {
fakesqldb.Register()
txPool := newTxPool(false)
appParams := sqldb.ConnParams{}
dbaParams := sqldb.ConnParams{}
txPool.Open(&appParams, &dbaParams)
// set pool capacity to 1
txPool.pool.SetCapacity(1)
defer txPool.Close()

// A timeout < 10ms should always fail.
ctx, _ := context.WithTimeout(context.Background(), 5*time.Millisecond)
defer handleAndVerifyTabletError(t, "expect to get an error", ErrTxPoolFull)
txPool.Begin(ctx)
}

Expand Down