Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions go/vt/vttablet/endtoend/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,6 @@ func TestConfigVars(t *testing.T) {
tag string
val int
}{{
tag: "BeginTimeout",
val: int(tabletenv.Config.TxPoolTimeout * 1e9),
}, {
tag: "ConnPoolAvailable",
val: tabletenv.Config.PoolSize,
}, {
Expand Down Expand Up @@ -115,6 +112,9 @@ func TestConfigVars(t *testing.T) {
val: tabletenv.Config.TransactionCap,
}, {
tag: "TransactionPoolTimeout",
val: int(tabletenv.Config.TxPoolTimeout * 1e9),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch. We shoudl add a test for TransactionTimeout.

}, {
tag: "TransactionTimeout",
val: int(tabletenv.Config.TransactionTimeout * 1e9),
}}
for _, tcase := range cases {
Expand Down
18 changes: 10 additions & 8 deletions go/vt/vttablet/endtoend/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,19 +329,13 @@ func TestTxPoolSize(t *testing.T) {

defer framework.Server.SetTxPoolSize(framework.Server.TxPoolSize())
framework.Server.SetTxPoolSize(1)
defer framework.Server.BeginTimeout.Set(framework.Server.BeginTimeout.Get())
timeout := 1 * time.Millisecond
framework.Server.BeginTimeout.Set(timeout)
vend := framework.DebugVars()
if err := verifyIntValue(vend, "TransactionPoolAvailable", 0); err != nil {
t.Error(err)
}
if err := verifyIntValue(vend, "TransactionPoolCapacity", 1); err != nil {
t.Error(err)
}
if err := verifyIntValue(vend, "BeginTimeout", int(timeout)); err != nil {
t.Error(err)
}

client2 := framework.NewClient()
err = client2.Begin(false)
Expand All @@ -358,8 +352,16 @@ func TestTxTimeout(t *testing.T) {
vstart := framework.DebugVars()

defer framework.Server.SetTxTimeout(framework.Server.TxTimeout())
framework.Server.SetTxTimeout(1 * time.Millisecond)
if err := verifyIntValue(framework.DebugVars(), "TransactionPoolTimeout", int(1*time.Millisecond)); err != nil {
txTimeout := 1 * time.Millisecond
framework.Server.SetTxTimeout(txTimeout)
if err := verifyIntValue(framework.DebugVars(), "TransactionTimeout", int(txTimeout)); err != nil {
t.Error(err)
}

defer framework.Server.SetTxPoolTimeout(framework.Server.TxPoolTimeout())
txPoolTimeout := 2 * time.Millisecond
framework.Server.SetTxPoolTimeout(txPoolTimeout)
if err := verifyIntValue(framework.DebugVars(), "TransactionPoolTimeout", int(txPoolTimeout)); err != nil {
t.Error(err)
}

Expand Down
16 changes: 12 additions & 4 deletions go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ func stateInfo(state int64) string {
// a subcomponent. These should also be idempotent.
type TabletServer struct {
QueryTimeout sync2.AtomicDuration
BeginTimeout sync2.AtomicDuration
TerseErrors bool
enableHotRowProtection bool

Expand Down Expand Up @@ -265,7 +264,6 @@ func NewTabletServerWithNilTopoServer(config tabletenv.TabletConfig) *TabletServ
func NewTabletServer(config tabletenv.TabletConfig, topoServer *topo.Server, alias topodatapb.TabletAlias) *TabletServer {
tsv := &TabletServer{
QueryTimeout: sync2.NewAtomicDuration(time.Duration(config.QueryTimeout * 1e9)),
BeginTimeout: sync2.NewAtomicDuration(time.Duration(config.TxPoolTimeout * 1e9)),
TerseErrors: config.TerseErrors,
enableHotRowProtection: config.EnableHotRowProtection || config.EnableHotRowProtectionDryRun,
checkMySQLThrottler: sync2.NewSemaphore(1, 0),
Expand Down Expand Up @@ -304,7 +302,6 @@ func NewTabletServer(config tabletenv.TabletConfig, topoServer *topo.Server, ali
})
stats.NewGaugeDurationFunc("QueryTimeout", "Tablet server query timeout", tsv.QueryTimeout.Get)
stats.NewGaugeDurationFunc("QueryPoolTimeout", "Tablet server timeout to get a connection from the query pool", tsv.qe.connTimeout.Get)
stats.NewGaugeDurationFunc("BeginTimeout", "Tablet server begin timeout", tsv.BeginTimeout.Get)
})
// TODO(sougou): move this up once the stats naming problem is fixed.
tsv.vstreamer = vstreamer.NewEngine(srvTopoServer, tsv.se)
Expand Down Expand Up @@ -772,7 +769,7 @@ func (tsv *TabletServer) SchemaEngine() *schema.Engine {
// Begin starts a new transaction. This is allowed only if the state is StateServing.
func (tsv *TabletServer) Begin(ctx context.Context, target *querypb.Target, options *querypb.ExecuteOptions) (transactionID int64, err error) {
err = tsv.execRequest(
ctx, tsv.BeginTimeout.Get(),
ctx, tsv.QueryTimeout.Get(),
"Begin", "begin", nil,
target, options, true /* isBegin */, false, /* allowOnShutdown */
func(ctx context.Context, logStats *tabletenv.LogStats) error {
Expand Down Expand Up @@ -2180,6 +2177,17 @@ func (tsv *TabletServer) TxTimeout() time.Duration {
return tsv.te.txPool.Timeout()
}

// SetTxPoolTimeout changes the transaction pool timeout to the specified value.
// This function should only be used for testing.
func (tsv *TabletServer) SetTxPoolTimeout(val time.Duration) {
tsv.te.txPool.SetPoolTimeout(val)
}

// TxPoolTimeout returns the transaction pool timeout.
func (tsv *TabletServer) TxPoolTimeout() time.Duration {
return tsv.te.txPool.PoolTimeout()
}

// SetQueryPlanCacheCap changes the pool size to the specified value.
// This function should only be used for testing.
func (tsv *TabletServer) SetQueryPlanCacheCap(val int) {
Expand Down
1 change: 1 addition & 0 deletions go/vt/vttablet/tabletserver/tx_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ func NewTxEngine(checker connpool.MySQLChecker, config tabletenv.TabletConfig) *
config.FoundRowsPoolSize,
config.TxPoolPrefillParallelism,
time.Duration(config.TransactionTimeout*1e9),
time.Duration(config.TxPoolTimeout*1e9),
time.Duration(config.IdleTimeout*1e9),
config.TxPoolWaiterCap,
checker,
Expand Down
62 changes: 39 additions & 23 deletions go/vt/vttablet/tabletserver/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,14 @@ type TxPool struct {
// connections with CLIENT_FOUND_ROWS flag set. A separate
// pool is needed because this option can only be set at
// connection time.
foundRowsPool *connpool.Pool
activePool *pools.Numbered
lastID sync2.AtomicInt64
timeout sync2.AtomicDuration
ticks *timer.Timer
checker connpool.MySQLChecker
limiter txlimiter.TxLimiter
foundRowsPool *connpool.Pool
activePool *pools.Numbered
lastID sync2.AtomicInt64
transactionTimeout sync2.AtomicDuration
transactionPoolTimeout sync2.AtomicDuration
ticks *timer.Timer
checker connpool.MySQLChecker
limiter txlimiter.TxLimiter
// Tracking culprits that cause tx pool full errors.
logMu sync.Mutex
lastLog time.Time
Expand All @@ -108,27 +109,30 @@ func NewTxPool(
capacity int,
foundRowsCapacity int,
prefillParallelism int,
timeout time.Duration,
transactionTimeout time.Duration,
transactionPoolTimeout time.Duration,
idleTimeout time.Duration,
waiterCap int,
checker connpool.MySQLChecker,
limiter txlimiter.TxLimiter) *TxPool {
axp := &TxPool{
conns: connpool.New(prefix+"TransactionPool", capacity, prefillParallelism, idleTimeout, checker),
foundRowsPool: connpool.New(prefix+"FoundRowsPool", foundRowsCapacity, prefillParallelism, idleTimeout, checker),
activePool: pools.NewNumbered(),
lastID: sync2.NewAtomicInt64(time.Now().UnixNano()),
timeout: sync2.NewAtomicDuration(timeout),
waiterCap: sync2.NewAtomicInt64(int64(waiterCap)),
waiters: sync2.NewAtomicInt64(0),
ticks: timer.NewTimer(timeout / 10),
checker: checker,
limiter: limiter,
conns: connpool.New(prefix+"TransactionPool", capacity, prefillParallelism, idleTimeout, checker),
foundRowsPool: connpool.New(prefix+"FoundRowsPool", foundRowsCapacity, prefillParallelism, idleTimeout, checker),
activePool: pools.NewNumbered(),
lastID: sync2.NewAtomicInt64(time.Now().UnixNano()),
transactionTimeout: sync2.NewAtomicDuration(transactionTimeout),
transactionPoolTimeout: sync2.NewAtomicDuration(transactionPoolTimeout),
waiterCap: sync2.NewAtomicInt64(int64(waiterCap)),
waiters: sync2.NewAtomicInt64(0),
ticks: timer.NewTimer(transactionTimeout / 10),
checker: checker,
limiter: limiter,
}
txOnce.Do(func() {
// Careful: conns also exports name+"xxx" vars,
// but we know it doesn't export Timeout.
stats.NewGaugeDurationFunc(prefix+"TransactionPoolTimeout", "Transaction pool timeout", axp.timeout.Get)
stats.NewGaugeDurationFunc(prefix+"TransactionTimeout", "Transaction timeout", axp.transactionTimeout.Get)
stats.NewGaugeDurationFunc(prefix+"TransactionPoolTimeout", "Timeout to get a connection from the transaction pool", axp.transactionPoolTimeout.Get)
stats.NewGaugeFunc(prefix+"TransactionPoolWaiters", "Transaction pool waiters", axp.waiters.Get)
})
return axp
Expand Down Expand Up @@ -230,10 +234,12 @@ func (axp *TxPool) Begin(ctx context.Context, options *querypb.ExecuteOptions) (
axp.limiter.Release(immediateCaller, effectiveCaller)
}()

poolCtx, poolCancel := context.WithTimeout(ctx, axp.transactionPoolTimeout.Get())
defer poolCancel()
if options.GetClientFoundRows() {
conn, err = axp.foundRowsPool.Get(ctx)
conn, err = axp.foundRowsPool.Get(poolCtx)
} else {
conn, err = axp.conns.Get(ctx)
conn, err = axp.conns.Get(poolCtx)
}
if err != nil {
switch err {
Expand Down Expand Up @@ -388,15 +394,25 @@ func (axp *TxPool) LogActive() {

// Timeout returns the transaction timeout.
func (axp *TxPool) Timeout() time.Duration {
return axp.timeout.Get()
return axp.transactionTimeout.Get()
}

// SetTimeout sets the transaction timeout.
func (axp *TxPool) SetTimeout(timeout time.Duration) {
axp.timeout.Set(timeout)
axp.transactionTimeout.Set(timeout)
axp.ticks.SetInterval(timeout / 10)
}

// PoolTimeout returns the transaction pool timeout.
func (axp *TxPool) PoolTimeout() time.Duration {
return axp.transactionPoolTimeout.Get()
}

// SetPoolTimeout sets the transaction pool timeout.
func (axp *TxPool) SetPoolTimeout(timeout time.Duration) {
axp.transactionPoolTimeout.Set(timeout)
}

// TxConnection is meant for executing transactions. It can return itself to
// the tx pool correctly. It also does not retry statements if there
// are failures.
Expand Down
2 changes: 2 additions & 0 deletions go/vt/vttablet/tabletserver/tx_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,7 @@ func newTxPool() *TxPool {
poolName := fmt.Sprintf("TestTransactionPool-%d", randID)
transactionCap := 300
transactionTimeout := time.Duration(30 * time.Second)
transactionPoolTimeout := time.Duration(40 * time.Second)
waiterCap := 500000
idleTimeout := time.Duration(30 * time.Second)
limiter := &txlimiter.TxAllowAll{}
Expand All @@ -702,6 +703,7 @@ func newTxPool() *TxPool {
transactionCap,
0,
transactionTimeout,
transactionPoolTimeout,
idleTimeout,
waiterCap,
DummyChecker,
Expand Down