Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions go/vt/vttablet/tabletserver/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ func (tp *TxPool) transactionKiller() {
if conn.IsTainted() && conn.IsInTransaction() {
tp.env.Stats().KillCounters.Add("Transactions", 1)
}
if conn.IsInTransaction() {
tp.txComplete(conn, tx.TxKill)
}
conn.Releasef("exceeded timeout: %v", tp.Timeout())
}
}
Expand Down Expand Up @@ -236,6 +239,13 @@ func (tp *TxPool) Begin(ctx context.Context, options *querypb.ExecuteOptions, re
return nil, "", vterrors.Errorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, "per-user transaction pool connection limit exceeded")
}
conn, err = tp.createConn(ctx, options)
defer func() {
if err != nil {
// The transaction limiter frees transactions on rollback or commit. If we fail to create the transaction,
// release immediately since there will be no rollback or commit.
tp.limiter.Release(immediateCaller, effectiveCaller)
}
}()
}
if err != nil {
return nil, "", err
Expand Down
169 changes: 140 additions & 29 deletions go/vt/vttablet/tabletserver/tx_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ package tabletserver
import (
"context"
"fmt"
"sync"
"testing"
"time"

"vitess.io/vitess/go/vt/callerid"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"

Expand All @@ -30,8 +32,6 @@ import (

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/vt/vttablet/tabletserver/txlimiter"

"vitess.io/vitess/go/mysql/fakesqldb"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
Expand All @@ -40,7 +40,7 @@ import (
)

func TestTxPoolExecuteCommit(t *testing.T) {
db, txPool, closer := setup(t)
db, txPool, _, closer := setup(t)
defer closer()

sql := "select 'this is a query'"
Expand Down Expand Up @@ -74,7 +74,7 @@ func TestTxPoolExecuteCommit(t *testing.T) {
}

func TestTxPoolExecuteRollback(t *testing.T) {
db, txPool, closer := setup(t)
db, txPool, _, closer := setup(t)
defer closer()

conn, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
Expand All @@ -92,7 +92,7 @@ func TestTxPoolExecuteRollback(t *testing.T) {
}

func TestTxPoolExecuteRollbackOnClosedConn(t *testing.T) {
db, txPool, closer := setup(t)
db, txPool, _, closer := setup(t)
defer closer()

conn, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
Expand All @@ -109,7 +109,7 @@ func TestTxPoolExecuteRollbackOnClosedConn(t *testing.T) {
}

func TestTxPoolRollbackNonBusy(t *testing.T) {
db, txPool, closer := setup(t)
db, txPool, _, closer := setup(t)
defer closer()

// start two transactions, and mark one of them as unused
Expand All @@ -135,7 +135,7 @@ func TestTxPoolRollbackNonBusy(t *testing.T) {
}

func TestTxPoolTransactionIsolation(t *testing.T) {
db, txPool, closer := setup(t)
db, txPool, _, closer := setup(t)
defer closer()

c2, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{TransactionIsolation: querypb.ExecuteOptions_READ_COMMITTED}, false, 0, nil)
Expand All @@ -146,7 +146,7 @@ func TestTxPoolTransactionIsolation(t *testing.T) {
}

func TestTxPoolAutocommit(t *testing.T) {
db, txPool, closer := setup(t)
db, txPool, _, closer := setup(t)
defer closer()

// Start a transaction with autocommit. This will ensure that the executor does not send begin/commit statements
Expand Down Expand Up @@ -192,7 +192,7 @@ func TestTxPoolBeginWithPoolConnectionError_Errno2006_Transient(t *testing.T) {
func primeTxPoolWithConnection(t *testing.T) (*fakesqldb.DB, *TxPool) {
t.Helper()
db := fakesqldb.New(t)
txPool := newTxPool()
txPool, _ := newTxPool()
// Set the capacity to 1 to ensure that the db connection is reused.
txPool.scp.conns.SetCapacity(1)
txPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams())
Expand All @@ -209,17 +209,42 @@ func primeTxPoolWithConnection(t *testing.T) (*fakesqldb.DB, *TxPool) {
}

func TestTxPoolBeginWithError(t *testing.T) {
db, txPool, closer := setup(t)
db, txPool, limiter, closer := setup(t)
defer closer()
db.AddRejectedQuery("begin", errRejected)
_, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)

im := &querypb.VTGateCallerID{
Username: "user",
}
ef := &vtrpcpb.CallerID{
Principal: "principle",
}

ctxWithCallerId := callerid.NewContext(ctx, ef, im)
_, _, err := txPool.Begin(ctxWithCallerId, &querypb.ExecuteOptions{}, false, 0, nil)
require.Error(t, err)
require.Contains(t, err.Error(), "error: rejected")
require.Equal(t, vtrpcpb.Code_UNKNOWN, vterrors.Code(err), "wrong error code for Begin error")

// Regression test for #6727: make sure the tx limiter is decremented if grabbing a connection
// errors for whatever reason.
require.Equal(t,
[]fakeLimiterEntry{
{
immediate: im,
effective: ef,
isRelease: false,
},
{
immediate: im,
effective: ef,
isRelease: true,
},
}, limiter.Actions())
}

func TestTxPoolBeginWithPreQueryError(t *testing.T) {
db, txPool, closer := setup(t)
db, txPool, _, closer := setup(t)
defer closer()
db.AddRejectedQuery("pre_query", errRejected)
_, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, []string{"pre_query"})
Expand All @@ -230,7 +255,7 @@ func TestTxPoolBeginWithPreQueryError(t *testing.T) {

func TestTxPoolCancelledContextError(t *testing.T) {
// given
db, txPool, closer := setup(t)
db, txPool, _, closer := setup(t)
defer closer()
ctx, cancel := context.WithCancel(ctx)
cancel()
Expand All @@ -251,7 +276,7 @@ func TestTxPoolWaitTimeoutError(t *testing.T) {
env.Config().TxPool.MaxWaiters = 0
env.Config().TxPool.TimeoutSeconds = 1
// given
db, txPool, closer := setupWithEnv(t, env)
db, txPool, _, closer := setupWithEnv(t, env)
defer closer()

// lock the only connection in the pool.
Expand All @@ -272,7 +297,7 @@ func TestTxPoolWaitTimeoutError(t *testing.T) {

func TestTxPoolRollbackFailIsPassedThrough(t *testing.T) {
sql := "alter table test_table add test_column int"
db, txPool, closer := setup(t)
db, txPool, _, closer := setup(t)
defer closer()
db.AddRejectedQuery("rollback", errRejected)

Expand All @@ -291,7 +316,7 @@ func TestTxPoolRollbackFailIsPassedThrough(t *testing.T) {
}

func TestTxPoolGetConnRecentlyRemovedTransaction(t *testing.T) {
db, txPool, _ := setup(t)
db, txPool, _, _ := setup(t)
defer db.Close()
conn1, _, _ := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
id := conn1.ID()
Expand All @@ -312,7 +337,7 @@ func TestTxPoolGetConnRecentlyRemovedTransaction(t *testing.T) {

assertErrorMatch(id, "pool closed")

txPool = newTxPool()
txPool, _ = newTxPool()
txPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams())

conn1, _, _ = txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
Expand All @@ -324,7 +349,7 @@ func TestTxPoolGetConnRecentlyRemovedTransaction(t *testing.T) {

assertErrorMatch(id, "transaction committed")

txPool = newTxPool()
txPool, _ = newTxPool()
txPool.SetTimeout(1 * time.Millisecond)
txPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams())
defer txPool.Close()
Expand All @@ -338,7 +363,7 @@ func TestTxPoolGetConnRecentlyRemovedTransaction(t *testing.T) {
}

func TestTxPoolCloseKillsStrayTransactions(t *testing.T) {
_, txPool, closer := setup(t)
_, txPool, _, closer := setup(t)
defer closer()

startingStray := txPool.env.Stats().InternalErrors.Counts()["StrayTransactions"]
Expand All @@ -354,13 +379,59 @@ func TestTxPoolCloseKillsStrayTransactions(t *testing.T) {
require.Equal(t, 0, txPool.scp.Capacity())
}

func newTxPool() *TxPool {
func TestTxTimeoutKillsTransactions(t *testing.T) {
env := newEnv("TabletServerTest")
env.Config().TxPool.Size = 1
env.Config().TxPool.MaxWaiters = 0
env.Config().Oltp.TxTimeoutSeconds = 1
_, txPool, limiter, closer := setupWithEnv(t, env)
defer closer()
startingKills := txPool.env.Stats().KillCounters.Counts()["Transactions"]

im := &querypb.VTGateCallerID{
Username: "user",
}
ef := &vtrpcpb.CallerID{
Principal: "principle",
}

ctxWithCallerId := callerid.NewContext(ctx, ef, im)

// Start transaction.
conn, _, err := txPool.Begin(ctxWithCallerId, &querypb.ExecuteOptions{}, false, 0, nil)
require.NoError(t, err)
conn.Unlock()

// Let it time out and get killed by the tx killer.
time.Sleep(1200 * time.Millisecond)

// Verify that the tx killer rand.
require.Equal(t, int64(1), txPool.env.Stats().KillCounters.Counts()["Transactions"]-startingKills)

// Regression test for #6727: make sure the tx limiter is decremented when the tx killer closes
// a transaction.
require.Equal(t,
[]fakeLimiterEntry{
{
immediate: im,
effective: ef,
isRelease: false,
},
{
immediate: im,
effective: ef,
isRelease: true,
},
}, limiter.Actions())
}

func newTxPool() (*TxPool, *fakeLimiter) {
return newTxPoolWithEnv(newEnv("TabletServerTest"))
}

func newTxPoolWithEnv(env tabletenv.Env) *TxPool {
limiter := &txlimiter.TxAllowAll{}
return NewTxPool(env, limiter)
func newTxPoolWithEnv(env tabletenv.Env) (*TxPool, *fakeLimiter) {
limiter := &fakeLimiter{}
return NewTxPool(env, limiter), limiter
}

func newEnv(exporterName string) tabletenv.Env {
Expand All @@ -376,27 +447,67 @@ func newEnv(exporterName string) tabletenv.Env {
return env
}

func setup(t *testing.T) (*fakesqldb.DB, *TxPool, func()) {
type fakeLimiterEntry struct {
immediate *querypb.VTGateCallerID
effective *vtrpcpb.CallerID
isRelease bool
}

type fakeLimiter struct {
actions []fakeLimiterEntry
mu sync.Mutex
}

func (fl *fakeLimiter) Get(immediate *querypb.VTGateCallerID, effective *vtrpcpb.CallerID) bool {
fl.mu.Lock()
defer fl.mu.Unlock()
fl.actions = append(fl.actions, fakeLimiterEntry{
immediate: immediate,
effective: effective,
isRelease: false,
})
return true
}

func (fl *fakeLimiter) Release(immediate *querypb.VTGateCallerID, effective *vtrpcpb.CallerID) {
fl.mu.Lock()
defer fl.mu.Unlock()
fl.actions = append(fl.actions, fakeLimiterEntry{
immediate: immediate,
effective: effective,
isRelease: true,
})
}

func (fl *fakeLimiter) Actions() []fakeLimiterEntry {
fl.mu.Lock()
defer fl.mu.Unlock()
result := make([]fakeLimiterEntry, len(fl.actions))
copy(result, fl.actions)
return result
}

func setup(t *testing.T) (*fakesqldb.DB, *TxPool, *fakeLimiter, func()) {
db := fakesqldb.New(t)
db.AddQueryPattern(".*", &sqltypes.Result{})

txPool := newTxPool()
txPool, limiter := newTxPool()
txPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams())

return db, txPool, func() {
return db, txPool, limiter, func() {
txPool.Close()
db.Close()
}
}

func setupWithEnv(t *testing.T, env tabletenv.Env) (*fakesqldb.DB, *TxPool, func()) {
func setupWithEnv(t *testing.T, env tabletenv.Env) (*fakesqldb.DB, *TxPool, *fakeLimiter, func()) {
db := fakesqldb.New(t)
db.AddQueryPattern(".*", &sqltypes.Result{})

txPool := newTxPoolWithEnv(env)
txPool, limiter := newTxPoolWithEnv(env)
txPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams())

return db, txPool, func() {
return db, txPool, limiter, func() {
txPool.Close()
db.Close()
}
Expand Down