diff --git a/go/vt/vttablet/endtoend/config_test.go b/go/vt/vttablet/endtoend/config_test.go index cb47068c47d..a5a3fc5520f 100644 --- a/go/vt/vttablet/endtoend/config_test.go +++ b/go/vt/vttablet/endtoend/config_test.go @@ -51,9 +51,6 @@ func TestConfigVars(t *testing.T) { tag string val int }{{ - tag: "BeginTimeout", - val: int(tabletenv.Config.TxPoolTimeout * 1e9), - }, { tag: "ConnPoolAvailable", val: tabletenv.Config.PoolSize, }, { @@ -115,6 +112,9 @@ func TestConfigVars(t *testing.T) { val: tabletenv.Config.TransactionCap, }, { tag: "TransactionPoolTimeout", + val: int(tabletenv.Config.TxPoolTimeout * 1e9), + }, { + tag: "TransactionTimeout", val: int(tabletenv.Config.TransactionTimeout * 1e9), }} for _, tcase := range cases { diff --git a/go/vt/vttablet/endtoend/transaction_test.go b/go/vt/vttablet/endtoend/transaction_test.go index ae15d5da92d..41f2330f366 100644 --- a/go/vt/vttablet/endtoend/transaction_test.go +++ b/go/vt/vttablet/endtoend/transaction_test.go @@ -329,9 +329,6 @@ func TestTxPoolSize(t *testing.T) { defer framework.Server.SetTxPoolSize(framework.Server.TxPoolSize()) framework.Server.SetTxPoolSize(1) - defer framework.Server.BeginTimeout.Set(framework.Server.BeginTimeout.Get()) - timeout := 1 * time.Millisecond - framework.Server.BeginTimeout.Set(timeout) vend := framework.DebugVars() if err := verifyIntValue(vend, "TransactionPoolAvailable", 0); err != nil { t.Error(err) @@ -339,9 +336,6 @@ func TestTxPoolSize(t *testing.T) { if err := verifyIntValue(vend, "TransactionPoolCapacity", 1); err != nil { t.Error(err) } - if err := verifyIntValue(vend, "BeginTimeout", int(timeout)); err != nil { - t.Error(err) - } client2 := framework.NewClient() err = client2.Begin(false) @@ -358,8 +352,16 @@ func TestTxTimeout(t *testing.T) { vstart := framework.DebugVars() defer framework.Server.SetTxTimeout(framework.Server.TxTimeout()) - framework.Server.SetTxTimeout(1 * time.Millisecond) - if err := verifyIntValue(framework.DebugVars(), "TransactionPoolTimeout", int(1*time.Millisecond)); err != nil { + txTimeout := 1 * time.Millisecond + framework.Server.SetTxTimeout(txTimeout) + if err := verifyIntValue(framework.DebugVars(), "TransactionTimeout", int(txTimeout)); err != nil { + t.Error(err) + } + + defer framework.Server.SetTxPoolTimeout(framework.Server.TxPoolTimeout()) + txPoolTimeout := 2 * time.Millisecond + framework.Server.SetTxPoolTimeout(txPoolTimeout) + if err := verifyIntValue(framework.DebugVars(), "TransactionPoolTimeout", int(txPoolTimeout)); err != nil { t.Error(err) } diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index 00c118cae93..1af43c7f24f 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -137,7 +137,6 @@ func stateInfo(state int64) string { // a subcomponent. These should also be idempotent. type TabletServer struct { QueryTimeout sync2.AtomicDuration - BeginTimeout sync2.AtomicDuration TerseErrors bool enableHotRowProtection bool @@ -265,7 +264,6 @@ func NewTabletServerWithNilTopoServer(config tabletenv.TabletConfig) *TabletServ func NewTabletServer(config tabletenv.TabletConfig, topoServer *topo.Server, alias topodatapb.TabletAlias) *TabletServer { tsv := &TabletServer{ QueryTimeout: sync2.NewAtomicDuration(time.Duration(config.QueryTimeout * 1e9)), - BeginTimeout: sync2.NewAtomicDuration(time.Duration(config.TxPoolTimeout * 1e9)), TerseErrors: config.TerseErrors, enableHotRowProtection: config.EnableHotRowProtection || config.EnableHotRowProtectionDryRun, checkMySQLThrottler: sync2.NewSemaphore(1, 0), @@ -304,7 +302,6 @@ func NewTabletServer(config tabletenv.TabletConfig, topoServer *topo.Server, ali }) stats.NewGaugeDurationFunc("QueryTimeout", "Tablet server query timeout", tsv.QueryTimeout.Get) stats.NewGaugeDurationFunc("QueryPoolTimeout", "Tablet server timeout to get a connection from the query pool", tsv.qe.connTimeout.Get) - stats.NewGaugeDurationFunc("BeginTimeout", "Tablet server begin timeout", tsv.BeginTimeout.Get) }) // TODO(sougou): move this up once the stats naming problem is fixed. tsv.vstreamer = vstreamer.NewEngine(srvTopoServer, tsv.se) @@ -772,7 +769,7 @@ func (tsv *TabletServer) SchemaEngine() *schema.Engine { // Begin starts a new transaction. This is allowed only if the state is StateServing. func (tsv *TabletServer) Begin(ctx context.Context, target *querypb.Target, options *querypb.ExecuteOptions) (transactionID int64, err error) { err = tsv.execRequest( - ctx, tsv.BeginTimeout.Get(), + ctx, tsv.QueryTimeout.Get(), "Begin", "begin", nil, target, options, true /* isBegin */, false, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { @@ -2180,6 +2177,17 @@ func (tsv *TabletServer) TxTimeout() time.Duration { return tsv.te.txPool.Timeout() } +// SetTxPoolTimeout changes the transaction pool timeout to the specified value. +// This function should only be used for testing. +func (tsv *TabletServer) SetTxPoolTimeout(val time.Duration) { + tsv.te.txPool.SetPoolTimeout(val) +} + +// TxPoolTimeout returns the transaction pool timeout. +func (tsv *TabletServer) TxPoolTimeout() time.Duration { + return tsv.te.txPool.PoolTimeout() +} + // SetQueryPlanCacheCap changes the pool size to the specified value. // This function should only be used for testing. func (tsv *TabletServer) SetQueryPlanCacheCap(val int) { diff --git a/go/vt/vttablet/tabletserver/tx_engine.go b/go/vt/vttablet/tabletserver/tx_engine.go index 148569a022f..37ae52b6b20 100644 --- a/go/vt/vttablet/tabletserver/tx_engine.go +++ b/go/vt/vttablet/tabletserver/tx_engine.go @@ -118,6 +118,7 @@ func NewTxEngine(checker connpool.MySQLChecker, config tabletenv.TabletConfig) * config.FoundRowsPoolSize, config.TxPoolPrefillParallelism, time.Duration(config.TransactionTimeout*1e9), + time.Duration(config.TxPoolTimeout*1e9), time.Duration(config.IdleTimeout*1e9), config.TxPoolWaiterCap, checker, diff --git a/go/vt/vttablet/tabletserver/tx_pool.go b/go/vt/vttablet/tabletserver/tx_pool.go index aaa74dbcac0..774b4fb0de6 100644 --- a/go/vt/vttablet/tabletserver/tx_pool.go +++ b/go/vt/vttablet/tabletserver/tx_pool.go @@ -88,13 +88,14 @@ type TxPool struct { // connections with CLIENT_FOUND_ROWS flag set. A separate // pool is needed because this option can only be set at // connection time. - foundRowsPool *connpool.Pool - activePool *pools.Numbered - lastID sync2.AtomicInt64 - timeout sync2.AtomicDuration - ticks *timer.Timer - checker connpool.MySQLChecker - limiter txlimiter.TxLimiter + foundRowsPool *connpool.Pool + activePool *pools.Numbered + lastID sync2.AtomicInt64 + transactionTimeout sync2.AtomicDuration + transactionPoolTimeout sync2.AtomicDuration + ticks *timer.Timer + checker connpool.MySQLChecker + limiter txlimiter.TxLimiter // Tracking culprits that cause tx pool full errors. logMu sync.Mutex lastLog time.Time @@ -108,27 +109,30 @@ func NewTxPool( capacity int, foundRowsCapacity int, prefillParallelism int, - timeout time.Duration, + transactionTimeout time.Duration, + transactionPoolTimeout time.Duration, idleTimeout time.Duration, waiterCap int, checker connpool.MySQLChecker, limiter txlimiter.TxLimiter) *TxPool { axp := &TxPool{ - conns: connpool.New(prefix+"TransactionPool", capacity, prefillParallelism, idleTimeout, checker), - foundRowsPool: connpool.New(prefix+"FoundRowsPool", foundRowsCapacity, prefillParallelism, idleTimeout, checker), - activePool: pools.NewNumbered(), - lastID: sync2.NewAtomicInt64(time.Now().UnixNano()), - timeout: sync2.NewAtomicDuration(timeout), - waiterCap: sync2.NewAtomicInt64(int64(waiterCap)), - waiters: sync2.NewAtomicInt64(0), - ticks: timer.NewTimer(timeout / 10), - checker: checker, - limiter: limiter, + conns: connpool.New(prefix+"TransactionPool", capacity, prefillParallelism, idleTimeout, checker), + foundRowsPool: connpool.New(prefix+"FoundRowsPool", foundRowsCapacity, prefillParallelism, idleTimeout, checker), + activePool: pools.NewNumbered(), + lastID: sync2.NewAtomicInt64(time.Now().UnixNano()), + transactionTimeout: sync2.NewAtomicDuration(transactionTimeout), + transactionPoolTimeout: sync2.NewAtomicDuration(transactionPoolTimeout), + waiterCap: sync2.NewAtomicInt64(int64(waiterCap)), + waiters: sync2.NewAtomicInt64(0), + ticks: timer.NewTimer(transactionTimeout / 10), + checker: checker, + limiter: limiter, } txOnce.Do(func() { // Careful: conns also exports name+"xxx" vars, // but we know it doesn't export Timeout. - stats.NewGaugeDurationFunc(prefix+"TransactionPoolTimeout", "Transaction pool timeout", axp.timeout.Get) + stats.NewGaugeDurationFunc(prefix+"TransactionTimeout", "Transaction timeout", axp.transactionTimeout.Get) + stats.NewGaugeDurationFunc(prefix+"TransactionPoolTimeout", "Timeout to get a connection from the transaction pool", axp.transactionPoolTimeout.Get) stats.NewGaugeFunc(prefix+"TransactionPoolWaiters", "Transaction pool waiters", axp.waiters.Get) }) return axp @@ -230,10 +234,12 @@ func (axp *TxPool) Begin(ctx context.Context, options *querypb.ExecuteOptions) ( axp.limiter.Release(immediateCaller, effectiveCaller) }() + poolCtx, poolCancel := context.WithTimeout(ctx, axp.transactionPoolTimeout.Get()) + defer poolCancel() if options.GetClientFoundRows() { - conn, err = axp.foundRowsPool.Get(ctx) + conn, err = axp.foundRowsPool.Get(poolCtx) } else { - conn, err = axp.conns.Get(ctx) + conn, err = axp.conns.Get(poolCtx) } if err != nil { switch err { @@ -388,15 +394,25 @@ func (axp *TxPool) LogActive() { // Timeout returns the transaction timeout. func (axp *TxPool) Timeout() time.Duration { - return axp.timeout.Get() + return axp.transactionTimeout.Get() } // SetTimeout sets the transaction timeout. func (axp *TxPool) SetTimeout(timeout time.Duration) { - axp.timeout.Set(timeout) + axp.transactionTimeout.Set(timeout) axp.ticks.SetInterval(timeout / 10) } +// PoolTimeout returns the transaction pool timeout. +func (axp *TxPool) PoolTimeout() time.Duration { + return axp.transactionPoolTimeout.Get() +} + +// SetPoolTimeout sets the transaction pool timeout. +func (axp *TxPool) SetPoolTimeout(timeout time.Duration) { + axp.transactionPoolTimeout.Set(timeout) +} + // TxConnection is meant for executing transactions. It can return itself to // the tx pool correctly. It also does not retry statements if there // are failures. diff --git a/go/vt/vttablet/tabletserver/tx_pool_test.go b/go/vt/vttablet/tabletserver/tx_pool_test.go index 5310cb2226e..733270dfbaa 100644 --- a/go/vt/vttablet/tabletserver/tx_pool_test.go +++ b/go/vt/vttablet/tabletserver/tx_pool_test.go @@ -693,6 +693,7 @@ func newTxPool() *TxPool { poolName := fmt.Sprintf("TestTransactionPool-%d", randID) transactionCap := 300 transactionTimeout := time.Duration(30 * time.Second) + transactionPoolTimeout := time.Duration(40 * time.Second) waiterCap := 500000 idleTimeout := time.Duration(30 * time.Second) limiter := &txlimiter.TxAllowAll{} @@ -702,6 +703,7 @@ func newTxPool() *TxPool { transactionCap, 0, transactionTimeout, + transactionPoolTimeout, idleTimeout, waiterCap, DummyChecker,