Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 1 addition & 11 deletions go/vt/vttablet/tabletserver/connpool/dbconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,6 @@ import (
querypb "vitess.io/vitess/go/vt/proto/query"
)

// BinlogFormat is used for specifying the binlog format.
type BinlogFormat int

// The following constants specify the possible binlog format values.
const (
BinlogFormatStatement BinlogFormat = iota
BinlogFormatRow
BinlogFormatMixed
)

// DBConn is a db connection for tabletserver.
// It performs automatic reconnects as needed.
// Its Execute function has a timeout that can kill
Expand Down Expand Up @@ -84,7 +74,7 @@ func NewDBConn(ctx context.Context, cp *Pool, appParams dbconfigs.Connector) (*D
}

// NewDBConnNoPool creates a new DBConn without a pool.
func NewDBConnNoPool(ctx context.Context, params dbconfigs.Connector, dbaPool *dbconnpool.ConnectionPool, stats *tabletenv.Stats) (*DBConn, error) {
func NewDBConnNoPool(ctx context.Context, params dbconfigs.Connector, dbaPool *dbconnpool.ConnectionPool) (*DBConn, error) {
c, err := dbconnpool.NewDBConnection(ctx, params)
if err != nil {
return nil, err
Expand Down
4 changes: 1 addition & 3 deletions go/vt/vttablet/tabletserver/connpool/dbconn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ import (
"vitess.io/vitess/go/sqltypes"

querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
)

func compareTimingCounts(t *testing.T, op string, delta int64, before, after map[string]int64) {
Expand Down Expand Up @@ -238,7 +236,7 @@ func TestDBNoPoolConnKill(t *testing.T) {
connPool := newPool()
connPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams())
defer connPool.Close()
dbConn, err := NewDBConnNoPool(context.Background(), db.ConnParams(), connPool.dbaPool, tabletenv.NewStats(servenv.NewExporter("Test", "Tablet")))
dbConn, err := NewDBConnNoPool(context.Background(), db.ConnParams(), connPool.dbaPool)
if dbConn != nil {
defer dbConn.Close()
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/connpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (cp *Pool) Get(ctx context.Context) (*DBConn, error) {
}

if cp.isCallerIDAppDebug(ctx) {
return NewDBConnNoPool(ctx, cp.appDebugParams, cp.dbaPool, cp.env.Stats())
return NewDBConnNoPool(ctx, cp.appDebugParams, cp.dbaPool)
}
p := cp.pool()
if p == nil {
Expand Down
132 changes: 65 additions & 67 deletions go/vt/vttablet/tabletserver/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package tabletserver

import (
"fmt"
"net/url"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -49,11 +48,10 @@ const (
TxClose = "close"
TxCommit = "commit"
TxRollback = "rollback"
TxPrepare = "prepare"
TxKill = "kill"
)

const txLogInterval = time.Duration(1 * time.Minute)
const txLogInterval = 1 * time.Minute

type queries struct {
setIsolationLevel string
Expand Down Expand Up @@ -120,79 +118,79 @@ func NewTxPool(env tabletenv.Env, limiter txlimiter.TxLimiter) *TxPool {

// Open makes the TxPool operational. This also starts the transaction killer
// that will kill long-running transactions.
func (axp *TxPool) Open(appParams, dbaParams, appDebugParams dbconfigs.Connector) {
log.Infof("Starting transaction id: %d", axp.lastID)
axp.conns.Open(appParams, dbaParams, appDebugParams)
func (tp *TxPool) Open(appParams, dbaParams, appDebugParams dbconfigs.Connector) {
log.Infof("Starting transaction id: %d", tp.lastID)
tp.conns.Open(appParams, dbaParams, appDebugParams)
foundRowsParam, _ := appParams.MysqlParams()
foundRowsParam.EnableClientFoundRows()
appParams = dbconfigs.New(foundRowsParam)
axp.foundRowsPool.Open(appParams, dbaParams, appDebugParams)
axp.ticks.Start(func() { axp.transactionKiller() })
tp.foundRowsPool.Open(appParams, dbaParams, appDebugParams)
tp.ticks.Start(func() { tp.transactionKiller() })
}

// Close closes the TxPool. A closed pool can be reopened.
func (axp *TxPool) Close() {
axp.ticks.Stop()
for _, v := range axp.activePool.GetOutdated(time.Duration(0), "for closing") {
func (tp *TxPool) Close() {
tp.ticks.Stop()
for _, v := range tp.activePool.GetOutdated(time.Duration(0), "for closing") {
conn := v.(*TxConnection)
log.Warningf("killing transaction for shutdown: %s", conn.Format(nil))
axp.env.Stats().InternalErrors.Add("StrayTransactions", 1)
log.Warningf("killing transaction for shutdown: %s", conn.Format())
tp.env.Stats().InternalErrors.Add("StrayTransactions", 1)
conn.Close()
conn.conclude(TxClose, "pool closed")
}
axp.conns.Close()
axp.foundRowsPool.Close()
tp.conns.Close()
tp.foundRowsPool.Close()
}

// AdjustLastID adjusts the last transaction id to be at least
// as large as the input value. This will ensure that there are
// no dtid collisions with future transactions.
func (axp *TxPool) AdjustLastID(id int64) {
if current := axp.lastID.Get(); current < id {
func (tp *TxPool) AdjustLastID(id int64) {
if current := tp.lastID.Get(); current < id {
log.Infof("Adjusting transaction id to: %d", id)
axp.lastID.Set(id)
tp.lastID.Set(id)
}
}

// RollbackNonBusy rolls back all transactions that are not in use.
// Transactions can be in use for situations like executing statements
// or in prepared state.
func (axp *TxPool) RollbackNonBusy(ctx context.Context) {
for _, v := range axp.activePool.GetOutdated(time.Duration(0), "for transition") {
axp.LocalConclude(ctx, v.(*TxConnection))
func (tp *TxPool) RollbackNonBusy(ctx context.Context) {
for _, v := range tp.activePool.GetOutdated(time.Duration(0), "for transition") {
tp.LocalConclude(ctx, v.(*TxConnection))
}
}

func (axp *TxPool) transactionKiller() {
defer axp.env.LogError()
for _, v := range axp.activePool.GetOutdated(time.Duration(axp.Timeout()), "for tx killer rollback") {
func (tp *TxPool) transactionKiller() {
defer tp.env.LogError()
for _, v := range tp.activePool.GetOutdated(tp.Timeout(), "for tx killer rollback") {
conn := v.(*TxConnection)
log.Warningf("killing transaction (exceeded timeout: %v): %s", axp.Timeout(), conn.Format(nil))
axp.env.Stats().KillCounters.Add("Transactions", 1)
log.Warningf("killing transaction (exceeded timeout: %v): %s", tp.Timeout(), conn.Format())
tp.env.Stats().KillCounters.Add("Transactions", 1)
conn.Close()
conn.conclude(TxKill, fmt.Sprintf("exceeded timeout: %v", axp.Timeout()))
conn.conclude(TxKill, fmt.Sprintf("exceeded timeout: %v", tp.Timeout()))
}
}

// WaitForEmpty waits until all active transactions are completed.
func (axp *TxPool) WaitForEmpty() {
axp.activePool.WaitForEmpty()
func (tp *TxPool) WaitForEmpty() {
tp.activePool.WaitForEmpty()
}

// Begin begins a transaction, and returns the associated transaction id and
// the statements (if any) executed to initiate the transaction. In autocommit
// mode the statement will be "".
//
// Subsequent statements can access the connection through the transaction id.
func (axp *TxPool) Begin(ctx context.Context, options *querypb.ExecuteOptions) (int64, string, error) {
func (tp *TxPool) Begin(ctx context.Context, options *querypb.ExecuteOptions) (int64, string, error) {
span, ctx := trace.NewSpan(ctx, "TxPool.Begin")
defer span.Finish()
var conn *connpool.DBConn
var err error
immediateCaller := callerid.ImmediateCallerIDFromContext(ctx)
effectiveCaller := callerid.EffectiveCallerIDFromContext(ctx)

if !axp.limiter.Get(immediateCaller, effectiveCaller) {
if !tp.limiter.Get(immediateCaller, effectiveCaller) {
return 0, "", vterrors.Errorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, "per-user transaction pool connection limit exceeded")
}

Expand All @@ -205,23 +203,23 @@ func (axp *TxPool) Begin(ctx context.Context, options *querypb.ExecuteOptions) (
if conn != nil {
conn.Recycle()
}
axp.limiter.Release(immediateCaller, effectiveCaller)
tp.limiter.Release(immediateCaller, effectiveCaller)
}()

if options.GetClientFoundRows() {
conn, err = axp.foundRowsPool.Get(ctx)
conn, err = tp.foundRowsPool.Get(ctx)
} else {
conn, err = axp.conns.Get(ctx)
conn, err = tp.conns.Get(ctx)
}
if err != nil {
switch err {
case connpool.ErrConnPoolClosed:
return 0, "", err
case pools.ErrCtxTimeout:
axp.LogActive()
tp.LogActive()
return 0, "", vterrors.Errorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, "transaction pool aborting request due to already expired context")
case pools.ErrTimeout:
axp.LogActive()
tp.LogActive()
return 0, "", vterrors.Errorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, "transaction pool connection limit exceeded")
}
return 0, "", err
Expand Down Expand Up @@ -249,13 +247,13 @@ func (axp *TxPool) Begin(ctx context.Context, options *querypb.ExecuteOptions) (
}

beginSucceeded = true
transactionID := axp.lastID.Add(1)
axp.activePool.Register(
transactionID := tp.lastID.Add(1)
tp.activePool.Register(
transactionID,
newTxConnection(
conn,
transactionID,
axp,
tp,
immediateCaller,
effectiveCaller,
autocommitTransaction,
Expand All @@ -266,32 +264,32 @@ func (axp *TxPool) Begin(ctx context.Context, options *querypb.ExecuteOptions) (
}

// Commit commits the specified transaction.
func (axp *TxPool) Commit(ctx context.Context, transactionID int64) (string, error) {
func (tp *TxPool) Commit(ctx context.Context, transactionID int64) (string, error) {
span, ctx := trace.NewSpan(ctx, "TxPool.Commit")
defer span.Finish()
conn, err := axp.Get(transactionID, "for commit")
conn, err := tp.Get(transactionID, "for commit")
if err != nil {
return "", err
}
return axp.LocalCommit(ctx, conn)
return tp.LocalCommit(ctx, conn)
}

// Rollback rolls back the specified transaction.
func (axp *TxPool) Rollback(ctx context.Context, transactionID int64) error {
func (tp *TxPool) Rollback(ctx context.Context, transactionID int64) error {
span, ctx := trace.NewSpan(ctx, "TxPool.Rollback")
defer span.Finish()

conn, err := axp.Get(transactionID, "for rollback")
conn, err := tp.Get(transactionID, "for rollback")
if err != nil {
return err
}
return axp.localRollback(ctx, conn)
return tp.localRollback(ctx, conn)
}

// Get fetches the connection associated to the transactionID.
// You must call Recycle on TxConnection once done.
func (axp *TxPool) Get(transactionID int64, reason string) (*TxConnection, error) {
v, err := axp.activePool.Get(transactionID, reason)
func (tp *TxPool) Get(transactionID int64, reason string) (*TxConnection, error) {
v, err := tp.activePool.Get(transactionID, reason)
if err != nil {
return nil, vterrors.Errorf(vtrpcpb.Code_ABORTED, "transaction %d: %v", transactionID, err)
}
Expand All @@ -301,20 +299,20 @@ func (axp *TxPool) Get(transactionID int64, reason string) (*TxConnection, error
// LocalBegin is equivalent to Begin->Get.
// It's used for executing transactions within a request. It's safe
// to always call LocalConclude at the end.
func (axp *TxPool) LocalBegin(ctx context.Context, options *querypb.ExecuteOptions) (*TxConnection, string, error) {
func (tp *TxPool) LocalBegin(ctx context.Context, options *querypb.ExecuteOptions) (*TxConnection, string, error) {
span, ctx := trace.NewSpan(ctx, "TxPool.LocalBegin")
defer span.Finish()

transactionID, beginSQL, err := axp.Begin(ctx, options)
transactionID, beginSQL, err := tp.Begin(ctx, options)
if err != nil {
return nil, "", err
}
conn, err := axp.Get(transactionID, "for local query")
conn, err := tp.Get(transactionID, "for local query")
return conn, beginSQL, err
}

// LocalCommit is the commit function for LocalBegin.
func (axp *TxPool) LocalCommit(ctx context.Context, conn *TxConnection) (string, error) {
func (tp *TxPool) LocalCommit(ctx context.Context, conn *TxConnection) (string, error) {
span, ctx := trace.NewSpan(ctx, "TxPool.LocalCommit")
defer span.Finish()
defer conn.conclude(TxCommit, "transaction committed")
Expand All @@ -332,16 +330,16 @@ func (axp *TxPool) LocalCommit(ctx context.Context, conn *TxConnection) (string,

// LocalConclude concludes a transaction started by LocalBegin.
// If the transaction was not previously concluded, it's rolled back.
func (axp *TxPool) LocalConclude(ctx context.Context, conn *TxConnection) {
func (tp *TxPool) LocalConclude(ctx context.Context, conn *TxConnection) {
if conn.dbConn == nil {
return
}
span, ctx := trace.NewSpan(ctx, "TxPool.LocalConclude")
defer span.Finish()
_ = axp.localRollback(ctx, conn)
_ = tp.localRollback(ctx, conn)
}

func (axp *TxPool) localRollback(ctx context.Context, conn *TxConnection) error {
func (tp *TxPool) localRollback(ctx context.Context, conn *TxConnection) error {
if conn.Autocommit {
conn.conclude(TxCommit, "returned to pool")
return nil
Expand All @@ -356,28 +354,28 @@ func (axp *TxPool) localRollback(ctx context.Context, conn *TxConnection) error

// LogActive causes all existing transactions to be logged when they complete.
// The logging is throttled to no more than once every txLogInterval.
func (axp *TxPool) LogActive() {
axp.logMu.Lock()
defer axp.logMu.Unlock()
if time.Since(axp.lastLog) < txLogInterval {
func (tp *TxPool) LogActive() {
tp.logMu.Lock()
defer tp.logMu.Unlock()
if time.Since(tp.lastLog) < txLogInterval {
return
}
axp.lastLog = time.Now()
conns := axp.activePool.GetAll()
tp.lastLog = time.Now()
conns := tp.activePool.GetAll()
for _, c := range conns {
c.(*TxConnection).LogToFile.Set(1)
}
}

// Timeout returns the transaction timeout.
func (axp *TxPool) Timeout() time.Duration {
return axp.transactionTimeout.Get()
func (tp *TxPool) Timeout() time.Duration {
return tp.transactionTimeout.Get()
}

// SetTimeout sets the transaction timeout.
func (axp *TxPool) SetTimeout(timeout time.Duration) {
axp.transactionTimeout.Set(timeout)
axp.ticks.SetInterval(timeout / 10)
func (tp *TxPool) SetTimeout(timeout time.Duration) {
tp.transactionTimeout.Set(timeout)
tp.ticks.SetInterval(timeout / 10)
}

// TxConnection is meant for executing transactions. It can return itself to
Expand Down Expand Up @@ -493,7 +491,7 @@ func (txc *TxConnection) log(conclusion string) {
txc.pool.env.Stats().UserTransactionTimesNs.Add([]string{username, conclusion}, int64(duration))
txc.pool.txStats.Add(conclusion, duration)
if txc.LogToFile.Get() != 0 {
log.Infof("Logged transaction: %s", txc.Format(nil))
log.Infof("Logged transaction: %s", txc.Format())
}
tabletenv.TxLogger.Send(txc)
}
Expand All @@ -504,7 +502,7 @@ func (txc *TxConnection) EventTime() time.Time {
}

// Format returns a printable version of the connection info.
func (txc *TxConnection) Format(params url.Values) string {
func (txc *TxConnection) Format() string {
return fmt.Sprintf(
"%v\t'%v'\t'%v'\t%v\t%v\t%.6f\t%v\t%v\t\n",
txc.TransactionID,
Expand Down