diff --git a/go/vt/vttablet/tabletserver/query_executor.go b/go/vt/vttablet/tabletserver/query_executor.go index 0a150e32fba..d196d1df9d9 100644 --- a/go/vt/vttablet/tabletserver/query_executor.go +++ b/go/vt/vttablet/tabletserver/query_executor.go @@ -296,16 +296,13 @@ func (qre *QueryExecutor) execDmlAutoCommit() (reply *sqltypes.Result, err error } func (qre *QueryExecutor) execAsTransaction(f func(conn *TxConnection) (*sqltypes.Result, error)) (reply *sqltypes.Result, err error) { - conn, err := qre.tsv.te.txPool.LocalBegin(qre.ctx, qre.options) + conn, beginSQL, err := qre.tsv.te.txPool.LocalBegin(qre.ctx, qre.options) if err != nil { return nil, err } defer qre.tsv.te.txPool.LocalConclude(qre.ctx, conn) - // - // A better future improvement would be for LocalBegin to return the set of - // executed statements to capture the isolation level setting as well. - if qre.options.GetTransactionIsolation() != querypb.ExecuteOptions_AUTOCOMMIT { - qre.logStats.AddRewrittenSQL("begin", time.Now()) + if beginSQL != "" { + qre.logStats.AddRewrittenSQL(beginSQL, time.Now()) } reply, err = f(conn) @@ -316,11 +313,11 @@ func (qre *QueryExecutor) execAsTransaction(f func(conn *TxConnection) (*sqltype qre.logStats.AddRewrittenSQL("rollback", start) return nil, err } - err = qre.tsv.te.txPool.LocalCommit(qre.ctx, conn, qre.tsv.messager) + commitSQL, err := qre.tsv.te.txPool.LocalCommit(qre.ctx, conn, qre.tsv.messager) // As above LocalCommit is a no-op for autocommmit so don't log anything. - if qre.options.GetTransactionIsolation() != querypb.ExecuteOptions_AUTOCOMMIT { - qre.logStats.AddRewrittenSQL("commit", start) + if commitSQL != "" { + qre.logStats.AddRewrittenSQL(commitSQL, start) } if err != nil { diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index 003b27708a0..8866b6e456f 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -237,12 +237,15 @@ type TxPoolController interface { // StopGently will change the state to NotServing but first wait for transactions to wrap up StopGently() - // Begin begins a transaction, and returns the associated transaction id. + // Begin begins a transaction, and returns the associated transaction id and the + // statement(s) used to execute the begin (if any). + // // Subsequent statements can access the connection through the transaction id. - Begin(ctx context.Context, options *querypb.ExecuteOptions) (int64, error) + Begin(ctx context.Context, options *querypb.ExecuteOptions) (int64, string, error) - // Commit commits the specified transaction. - Commit(ctx context.Context, transactionID int64, mc messageCommitter) error + // Commit commits the specified transaction, returning the statement used to execute + // the commit or "" in autocommit settings. + Commit(ctx context.Context, transactionID int64, mc messageCommitter) (string, error) // Rollback rolls back the specified transaction. Rollback(ctx context.Context, transactionID int64) error @@ -773,13 +776,25 @@ func (tsv *TabletServer) Begin(ctx context.Context, target *querypb.Target, opti "Begin", "begin", nil, target, options, true /* isBegin */, false, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { - defer tabletenv.QueryStats.Record("BEGIN", time.Now()) + startTime := time.Now() if tsv.txThrottler.Throttle() { // TODO(erez): I think this should be RESOURCE_EXHAUSTED. return vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "Transaction throttled") } - transactionID, err = tsv.teCtrl.Begin(ctx, options) + var beginSQL string + transactionID, beginSQL, err = tsv.teCtrl.Begin(ctx, options) logStats.TransactionID = transactionID + + // Record the actual statements that were executed in the logStats. + // If nothing was actually executed, don't count the operation in + // the tablet metrics, and clear out the logStats Method so that + // handlePanicAndSendLogStats doesn't log the no-op. + logStats.OriginalSQL = beginSQL + if beginSQL != "" { + tabletenv.QueryStats.Record("BEGIN", startTime) + } else { + logStats.Method = "" + } return err }, ) @@ -793,9 +808,21 @@ func (tsv *TabletServer) Commit(ctx context.Context, target *querypb.Target, tra "Commit", "commit", nil, target, nil, false /* isBegin */, true, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { - defer tabletenv.QueryStats.Record("COMMIT", time.Now()) + startTime := time.Now() logStats.TransactionID = transactionID - return tsv.teCtrl.Commit(ctx, transactionID, tsv.messager) + + var commitSQL string + commitSQL, err = tsv.teCtrl.Commit(ctx, transactionID, tsv.messager) + + // If nothing was actually executed, don't count the operation in + // the tablet metrics, and clear out the logStats Method so that + // handlePanicAndSendLogStats doesn't log the no-op. + if commitSQL != "" { + tabletenv.QueryStats.Record("COMMIT", startTime) + } else { + logStats.Method = "" + } + return err }, ) } @@ -1471,6 +1498,7 @@ func (tsv *TabletServer) handlePanicAndSendLogStats( // Examples where we don't send the log stats: // - ExecuteBatch() (logStats == nil) // - beginWaitForSameRangeTransactions() (Method == "") + // - Begin / Commit in autocommit mode if logStats != nil && logStats.Method != "" { logStats.Send() } @@ -1860,9 +1888,9 @@ func (tsv *TabletServer) BroadcastHealth(terTimestamp int64, stats *querypb.Real target := tsv.target tsv.mu.Unlock() shr := &querypb.StreamHealthResponse{ - Target: &target, - TabletAlias: &tsv.alias, - Serving: tsv.IsServing(), + Target: &target, + TabletAlias: &tsv.alias, + Serving: tsv.IsServing(), TabletExternallyReparentedTimestamp: terTimestamp, RealtimeStats: stats, } diff --git a/go/vt/vttablet/tabletserver/tx_engine.go b/go/vt/vttablet/tabletserver/tx_engine.go index cb928f9a934..9db9a56ef5e 100644 --- a/go/vt/vttablet/tabletserver/tx_engine.go +++ b/go/vt/vttablet/tabletserver/tx_engine.go @@ -264,22 +264,24 @@ func (te *TxEngine) AcceptReadOnly() error { } } -// Begin begins a transaction, and returns the associated transaction id. +// Begin begins a transaction, and returns the associated transaction id and the +// statement(s) used to execute the begin (if any). +// // Subsequent statements can access the connection through the transaction id. -func (te *TxEngine) Begin(ctx context.Context, options *querypb.ExecuteOptions) (int64, error) { +func (te *TxEngine) Begin(ctx context.Context, options *querypb.ExecuteOptions) (int64, string, error) { te.stateLock.Lock() canOpenTransactions := te.state == AcceptingReadOnly || te.state == AcceptingReadAndWrite if !canOpenTransactions { // We are not in a state where we can start new transactions. Abort. te.stateLock.Unlock() - return 0, vterrors.Errorf(vtrpc.Code_UNAVAILABLE, "tx engine can't accept new transactions in state %v", te.state) + return 0, "", vterrors.Errorf(vtrpc.Code_UNAVAILABLE, "tx engine can't accept new transactions in state %v", te.state) } isWriteTransaction := options == nil || options.TransactionIsolation != querypb.ExecuteOptions_CONSISTENT_SNAPSHOT_READ_ONLY if te.state == AcceptingReadOnly && isWriteTransaction { te.stateLock.Unlock() - return 0, vterrors.Errorf(vtrpc.Code_UNAVAILABLE, "tx engine can only accept read-only transactions in current state") + return 0, "", vterrors.Errorf(vtrpc.Code_UNAVAILABLE, "tx engine can only accept read-only transactions in current state") } // By Add() to beginRequests, we block others from initiating state @@ -292,7 +294,7 @@ func (te *TxEngine) Begin(ctx context.Context, options *querypb.ExecuteOptions) } // Commit commits the specified transaction. -func (te *TxEngine) Commit(ctx context.Context, transactionID int64, mc messageCommitter) error { +func (te *TxEngine) Commit(ctx context.Context, transactionID int64, mc messageCommitter) (string, error) { return te.txPool.Commit(ctx, transactionID, mc) } @@ -466,7 +468,7 @@ outer: if txid > maxid { maxid = txid } - conn, err := te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) + conn, _, err := te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) if err != nil { allErr.RecordError(err) continue diff --git a/go/vt/vttablet/tabletserver/tx_engine_test.go b/go/vt/vttablet/tabletserver/tx_engine_test.go index cb7c235e41c..3c209a49bf4 100644 --- a/go/vt/vttablet/tabletserver/tx_engine_test.go +++ b/go/vt/vttablet/tabletserver/tx_engine_test.go @@ -55,10 +55,13 @@ func TestTxEngineClose(t *testing.T) { // Normal close with timeout wait. te.open() - c, err := te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) + c, beginSQL, err := te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) if err != nil { t.Fatal(err) } + if beginSQL != "begin" { + t.Errorf("beginSQL: %q, want 'begin'", beginSQL) + } c.Recycle() start = time.Now() te.close(false) @@ -68,7 +71,7 @@ func TestTxEngineClose(t *testing.T) { // Immediate close. te.open() - c, err = te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) + c, beginSQL, err = te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) if err != nil { t.Fatal(err) } @@ -82,7 +85,7 @@ func TestTxEngineClose(t *testing.T) { // Normal close with short grace period. te.shutdownGracePeriod = 250 * time.Millisecond te.open() - c, err = te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) + c, beginSQL, err = te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) if err != nil { t.Fatal(err) } @@ -99,7 +102,7 @@ func TestTxEngineClose(t *testing.T) { // Normal close with short grace period, but pool gets empty early. te.shutdownGracePeriod = 250 * time.Millisecond te.open() - c, err = te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) + c, beginSQL, err = te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) if err != nil { t.Fatal(err) } @@ -123,7 +126,7 @@ func TestTxEngineClose(t *testing.T) { // Immediate close, but connection is in use. te.open() - c, err = te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) + c, beginSQL, err = te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) if err != nil { t.Fatal(err) } @@ -492,6 +495,6 @@ func startTransaction(te *TxEngine, writeTransaction bool) error { } else { options.TransactionIsolation = querypb.ExecuteOptions_CONSISTENT_SNAPSHOT_READ_ONLY } - _, err := te.Begin(context.Background(), options) + _, _, err := te.Begin(context.Background(), options) return err } diff --git a/go/vt/vttablet/tabletserver/tx_executor.go b/go/vt/vttablet/tabletserver/tx_executor.go index f9d00174c40..e485174745f 100644 --- a/go/vt/vttablet/tabletserver/tx_executor.go +++ b/go/vt/vttablet/tabletserver/tx_executor.go @@ -68,7 +68,7 @@ func (txe *TxExecutor) Prepare(transactionID int64, dtid string) error { return vterrors.Errorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, "prepare failed for transaction %d: %v", transactionID, err) } - localConn, err := txe.te.txPool.LocalBegin(txe.ctx, &querypb.ExecuteOptions{}) + localConn, _, err := txe.te.txPool.LocalBegin(txe.ctx, &querypb.ExecuteOptions{}) if err != nil { return err } @@ -79,7 +79,7 @@ func (txe *TxExecutor) Prepare(transactionID int64, dtid string) error { return err } - err = txe.te.txPool.LocalCommit(txe.ctx, localConn, txe.messager) + _, err = txe.te.txPool.LocalCommit(txe.ctx, localConn, txe.messager) if err != nil { return err } @@ -111,7 +111,7 @@ func (txe *TxExecutor) CommitPrepared(dtid string) error { txe.markFailed(ctx, dtid) return err } - err = txe.te.txPool.LocalCommit(ctx, conn, txe.messager) + _, err = txe.te.txPool.LocalCommit(ctx, conn, txe.messager) if err != nil { txe.markFailed(ctx, dtid) return err @@ -130,7 +130,7 @@ func (txe *TxExecutor) CommitPrepared(dtid string) error { func (txe *TxExecutor) markFailed(ctx context.Context, dtid string) { tabletenv.InternalErrors.Add("TwopcCommit", 1) txe.te.preparedPool.SetFailed(dtid) - conn, err := txe.te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) + conn, _, err := txe.te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) if err != nil { log.Errorf("markFailed: Begin failed for dtid %s: %v", dtid, err) return @@ -142,7 +142,7 @@ func (txe *TxExecutor) markFailed(ctx context.Context, dtid string) { return } - if err = txe.te.txPool.LocalCommit(ctx, conn, txe.messager); err != nil { + if _, err = txe.te.txPool.LocalCommit(ctx, conn, txe.messager); err != nil { log.Errorf("markFailed: Commit failed for dtid %s: %v", dtid, err) } } @@ -170,7 +170,7 @@ func (txe *TxExecutor) RollbackPrepared(dtid string, originalID int64) error { return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled") } defer tabletenv.QueryStats.Record("ROLLBACK_PREPARED", time.Now()) - conn, err := txe.te.txPool.LocalBegin(txe.ctx, &querypb.ExecuteOptions{}) + conn, _, err := txe.te.txPool.LocalBegin(txe.ctx, &querypb.ExecuteOptions{}) if err != nil { goto returnConn } @@ -181,7 +181,7 @@ func (txe *TxExecutor) RollbackPrepared(dtid string, originalID int64) error { goto returnConn } - err = txe.te.txPool.LocalCommit(txe.ctx, conn, txe.messager) + _, err = txe.te.txPool.LocalCommit(txe.ctx, conn, txe.messager) returnConn: if preparedConn := txe.te.preparedPool.FetchForRollback(dtid); preparedConn != nil { @@ -200,7 +200,7 @@ func (txe *TxExecutor) CreateTransaction(dtid string, participants []*querypb.Ta return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled") } defer tabletenv.QueryStats.Record("CREATE_TRANSACTION", time.Now()) - conn, err := txe.te.txPool.LocalBegin(txe.ctx, &querypb.ExecuteOptions{}) + conn, _, err := txe.te.txPool.LocalBegin(txe.ctx, &querypb.ExecuteOptions{}) if err != nil { return err } @@ -210,7 +210,8 @@ func (txe *TxExecutor) CreateTransaction(dtid string, participants []*querypb.Ta if err != nil { return err } - return txe.te.txPool.LocalCommit(txe.ctx, conn, txe.messager) + _, err = txe.te.txPool.LocalCommit(txe.ctx, conn, txe.messager) + return err } // StartCommit atomically commits the transaction along with the @@ -232,7 +233,8 @@ func (txe *TxExecutor) StartCommit(transactionID int64, dtid string) error { if err != nil { return err } - return txe.te.txPool.LocalCommit(txe.ctx, conn, txe.messager) + _, err = txe.te.txPool.LocalCommit(txe.ctx, conn, txe.messager) + return err } // SetRollback transitions the 2pc transaction to the Rollback state. @@ -248,7 +250,7 @@ func (txe *TxExecutor) SetRollback(dtid string, transactionID int64) error { txe.te.txPool.Rollback(txe.ctx, transactionID) } - conn, err := txe.te.txPool.LocalBegin(txe.ctx, &querypb.ExecuteOptions{}) + conn, _, err := txe.te.txPool.LocalBegin(txe.ctx, &querypb.ExecuteOptions{}) if err != nil { return err } @@ -259,7 +261,7 @@ func (txe *TxExecutor) SetRollback(dtid string, transactionID int64) error { return err } - err = txe.te.txPool.LocalCommit(txe.ctx, conn, txe.messager) + _, err = txe.te.txPool.LocalCommit(txe.ctx, conn, txe.messager) if err != nil { return err } @@ -275,7 +277,7 @@ func (txe *TxExecutor) ConcludeTransaction(dtid string) error { } defer tabletenv.QueryStats.Record("RESOLVE", time.Now()) - conn, err := txe.te.txPool.LocalBegin(txe.ctx, &querypb.ExecuteOptions{}) + conn, _, err := txe.te.txPool.LocalBegin(txe.ctx, &querypb.ExecuteOptions{}) if err != nil { return err } @@ -285,7 +287,8 @@ func (txe *TxExecutor) ConcludeTransaction(dtid string) error { if err != nil { return err } - return txe.te.txPool.LocalCommit(txe.ctx, conn, txe.messager) + _, err = txe.te.txPool.LocalCommit(txe.ctx, conn, txe.messager) + return err } // ReadTransaction returns the metadata for the sepcified dtid. diff --git a/go/vt/vttablet/tabletserver/tx_pool.go b/go/vt/vttablet/tabletserver/tx_pool.go index 4d766643792..5f068fba724 100644 --- a/go/vt/vttablet/tabletserver/tx_pool.go +++ b/go/vt/vttablet/tabletserver/tx_pool.go @@ -192,23 +192,26 @@ func (axp *TxPool) WaitForEmpty() { axp.activePool.WaitForEmpty() } -// Begin begins a transaction, and returns the associated transaction id. +// Begin begins a transaction, and returns the associated transaction id and +// the statements (if any) executed to initiate the transaction. In autocommit +// mode the statement will be "". +// // Subsequent statements can access the connection through the transaction id. -func (axp *TxPool) Begin(ctx context.Context, options *querypb.ExecuteOptions) (int64, error) { +func (axp *TxPool) Begin(ctx context.Context, options *querypb.ExecuteOptions) (int64, string, error) { var conn *connpool.DBConn var err error immediateCaller := callerid.ImmediateCallerIDFromContext(ctx) effectiveCaller := callerid.EffectiveCallerIDFromContext(ctx) if !axp.limiter.Get(immediateCaller, effectiveCaller) { - return 0, vterrors.Errorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, "per-user transaction pool connection limit exceeded") + return 0, "", vterrors.Errorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, "per-user transaction pool connection limit exceeded") } waiterCount := axp.waiters.Add(1) defer axp.waiters.Add(-1) if waiterCount > axp.waiterCap.Get() { - return 0, vterrors.New(vtrpcpb.Code_RESOURCE_EXHAUSTED, "transaction pool waiter count exceeded") + return 0, "", vterrors.New(vtrpcpb.Code_RESOURCE_EXHAUSTED, "transaction pool waiter count exceeded") } var beginSucceeded bool @@ -231,30 +234,33 @@ func (axp *TxPool) Begin(ctx context.Context, options *querypb.ExecuteOptions) ( if err != nil { switch err { case connpool.ErrConnPoolClosed: - return 0, err + return 0, "", err case pools.ErrTimeout: axp.LogActive() - return 0, vterrors.Errorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, "transaction pool connection limit exceeded") + return 0, "", vterrors.Errorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, "transaction pool connection limit exceeded") } - return 0, err + return 0, "", err } autocommitTransaction := false - + beginQueries := "" if queries, ok := txIsolations[options.GetTransactionIsolation()]; ok { if queries.setIsolationLevel != "" { if _, err := conn.Exec(ctx, "set transaction isolation level "+queries.setIsolationLevel, 1, false); err != nil { - return 0, err + return 0, "", err } + + beginQueries = queries.setIsolationLevel + "; " } if _, err := conn.Exec(ctx, queries.openTransaction, 1, false); err != nil { - return 0, err + return 0, "", err } + beginQueries = beginQueries + queries.openTransaction } else if options.GetTransactionIsolation() == querypb.ExecuteOptions_AUTOCOMMIT { autocommitTransaction = true } else { - return 0, fmt.Errorf("don't know how to open a transaction of this type: %v", options.GetTransactionIsolation()) + return 0, "", fmt.Errorf("don't know how to open a transaction of this type: %v", options.GetTransactionIsolation()) } beginSucceeded = true @@ -271,14 +277,14 @@ func (axp *TxPool) Begin(ctx context.Context, options *querypb.ExecuteOptions) ( ), options.GetWorkload() != querypb.ExecuteOptions_DBA, ) - return transactionID, nil + return transactionID, beginQueries, nil } // Commit commits the specified transaction. -func (axp *TxPool) Commit(ctx context.Context, transactionID int64, mc messageCommitter) error { +func (axp *TxPool) Commit(ctx context.Context, transactionID int64, mc messageCommitter) (string, error) { conn, err := axp.Get(transactionID, "for commit") if err != nil { - return err + return "", err } return axp.LocalCommit(ctx, conn, mc) } @@ -305,30 +311,31 @@ func (axp *TxPool) Get(transactionID int64, reason string) (*TxConnection, error // LocalBegin is equivalent to Begin->Get. // It's used for executing transactions within a request. It's safe // to always call LocalConclude at the end. -func (axp *TxPool) LocalBegin(ctx context.Context, options *querypb.ExecuteOptions) (*TxConnection, error) { - transactionID, err := axp.Begin(ctx, options) +func (axp *TxPool) LocalBegin(ctx context.Context, options *querypb.ExecuteOptions) (*TxConnection, string, error) { + transactionID, beginSQL, err := axp.Begin(ctx, options) if err != nil { - return nil, err + return nil, "", err } - return axp.Get(transactionID, "for local query") + conn, err := axp.Get(transactionID, "for local query") + return conn, beginSQL, err } // LocalCommit is the commit function for LocalBegin. -func (axp *TxPool) LocalCommit(ctx context.Context, conn *TxConnection, mc messageCommitter) error { +func (axp *TxPool) LocalCommit(ctx context.Context, conn *TxConnection, mc messageCommitter) (string, error) { defer conn.conclude(TxCommit, "transaction committed") defer mc.LockDB(conn.NewMessages, conn.ChangedMessages)() if conn.Autocommit { mc.UpdateCaches(conn.NewMessages, conn.ChangedMessages) - return nil + return "", nil } if _, err := conn.Exec(ctx, "commit", 1, false); err != nil { conn.Close() - return err + return "", err } mc.UpdateCaches(conn.NewMessages, conn.ChangedMessages) - return nil + return "commit", nil } // LocalConclude concludes a transaction started by LocalBegin. diff --git a/go/vt/vttablet/tabletserver/tx_pool_test.go b/go/vt/vttablet/tabletserver/tx_pool_test.go index d58b0357319..b1d54686b1d 100644 --- a/go/vt/vttablet/tabletserver/tx_pool_test.go +++ b/go/vt/vttablet/tabletserver/tx_pool_test.go @@ -40,6 +40,42 @@ import ( "vitess.io/vitess/go/vt/vttablet/tabletserver/messager" ) +func TestTxPoolExecuteCommit(t *testing.T) { + sql := "update test_column set x=1 where 1!=1" + db := fakesqldb.New(t) + defer db.Close() + db.AddQuery(sql, &sqltypes.Result{}) + db.AddQuery("begin", &sqltypes.Result{}) + db.AddQuery("commit", &sqltypes.Result{}) + + txPool := newTxPool() + txPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams()) + defer txPool.Close() + ctx := context.Background() + transactionID, beginSQL, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}) + if err != nil { + t.Fatal(err) + } + if beginSQL != "begin" { + t.Errorf("beginSQL got %q want 'begin'", beginSQL) + } + txConn, err := txPool.Get(transactionID, "for query") + if err != nil { + t.Fatal(err) + } + txConn.RecordQuery(sql) + _, err = txConn.Exec(ctx, sql, 1, true) + txConn.Recycle() + + commitSQL, err := txPool.Commit(ctx, transactionID, &fakeMessageCommitter{}) + if err != nil { + t.Fatal(err) + } + if commitSQL != "commit" { + t.Errorf("commitSQL got %q want 'commit'", commitSQL) + } +} + func TestTxPoolExecuteRollback(t *testing.T) { sql := "alter table test_table add test_column int" db := fakesqldb.New(t) @@ -52,10 +88,13 @@ func TestTxPoolExecuteRollback(t *testing.T) { txPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams()) defer txPool.Close() ctx := context.Background() - transactionID, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}) + transactionID, beginSQL, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}) if err != nil { t.Fatal(err) } + if beginSQL != "begin" { + t.Errorf("beginSQL got %q want 'begin'", beginSQL) + } txConn, err := txPool.Get(transactionID, "for query") if err != nil { t.Fatal(err) @@ -79,11 +118,11 @@ func TestTxPoolRollbackNonBusy(t *testing.T) { txPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams()) defer txPool.Close() ctx := context.Background() - txid1, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}) + txid1, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}) if err != nil { t.Fatal(err) } - _, err = txPool.Begin(ctx, &querypb.ExecuteOptions{}) + _, _, err = txPool.Begin(ctx, &querypb.ExecuteOptions{}) if err != nil { t.Fatal(err) } @@ -173,7 +212,7 @@ func TestTxPoolTransactionKillerEnforceTimeoutEnabled(t *testing.T) { } func addQuery(ctx context.Context, sql string, txPool *TxPool, workload querypb.ExecuteOptions_Workload) (int64, error) { - transactionID, err := txPool.Begin(ctx, &querypb.ExecuteOptions{Workload: workload}) + transactionID, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{Workload: workload}) if err != nil { return 0, err } @@ -199,10 +238,13 @@ func TestTxPoolClientRowsFound(t *testing.T) { // Start a 'normal' transaction. It should take a connection // for the normal 'conns' pool. - id1, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}) + id1, beginSQL, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}) if err != nil { t.Fatal(err) } + if beginSQL != "begin" { + t.Errorf("beginSQL got %q want 'begin'", beginSQL) + } if got, want := txPool.conns.Available(), startNormalSize-1; got != want { t.Errorf("Normal pool size: %d, want %d", got, want) } @@ -212,10 +254,13 @@ func TestTxPoolClientRowsFound(t *testing.T) { // Start a 'foundRows' transaction. It should take a connection // from the foundRows pool. - id2, err := txPool.Begin(ctx, &querypb.ExecuteOptions{ClientFoundRows: true}) + id2, beginSQL, err := txPool.Begin(ctx, &querypb.ExecuteOptions{ClientFoundRows: true}) if err != nil { t.Fatal(err) } + if beginSQL != "begin" { + t.Errorf("beginSQL got %q want 'begin'", beginSQL) + } if got, want := txPool.conns.Available(), startNormalSize-1; got != want { t.Errorf("Normal pool size: %d, want %d", got, want) } @@ -253,16 +298,23 @@ func TestTxPoolTransactionIsolation(t *testing.T) { ctx := context.Background() // Start a transaction with default. It should not change isolation. - _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}) + _, beginSQL, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}) if err != nil { t.Fatal(err) } + if beginSQL != "begin" { + t.Errorf("beginSQL got %q want 'begin'", beginSQL) + } db.AddQuery("set transaction isolation level READ COMMITTED", &sqltypes.Result{}) - _, err = txPool.Begin(ctx, &querypb.ExecuteOptions{TransactionIsolation: querypb.ExecuteOptions_READ_COMMITTED}) + _, beginSQL, err = txPool.Begin(ctx, &querypb.ExecuteOptions{TransactionIsolation: querypb.ExecuteOptions_READ_COMMITTED}) if err != nil { t.Fatal(err) } + wantBeginSQL := "READ COMMITTED; begin" + if beginSQL != wantBeginSQL { + t.Errorf("beginSQL got %q want %q", beginSQL, wantBeginSQL) + } } func TestTxPoolAutocommit(t *testing.T) { @@ -276,14 +328,20 @@ func TestTxPoolAutocommit(t *testing.T) { // to mysql. // This test is meaningful because if txPool.Begin were to send a BEGIN statement to the connection, it will fatal // because is not in the list of expected queries (i.e db.AddQuery hasn't been called). - txid, err := txPool.Begin(ctx, &querypb.ExecuteOptions{TransactionIsolation: querypb.ExecuteOptions_AUTOCOMMIT}) + txid, beginSQL, err := txPool.Begin(ctx, &querypb.ExecuteOptions{TransactionIsolation: querypb.ExecuteOptions_AUTOCOMMIT}) if err != nil { t.Fatal(err) } - err = txPool.Commit(ctx, txid, &fakeMessageCommitter{}) + if beginSQL != "" { + t.Errorf("beginSQL got %q want ''", beginSQL) + } + commitSQL, err := txPool.Commit(ctx, txid, &fakeMessageCommitter{}) if err != nil { t.Fatal(err) } + if commitSQL != "" { + t.Errorf("commitSQL got %q want ''", commitSQL) + } } // TestTxPoolBeginWithPoolConnectionError_TransientErrno2006 tests the case @@ -305,7 +363,7 @@ func TestTxPoolBeginWithPoolConnectionError_Errno2006_Transient(t *testing.T) { } ctx := context.Background() - txConn, err := txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) + txConn, _, err := txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) if err != nil { t.Fatalf("Begin should have succeeded after the retry in DBConn.Exec(): %v", err) } @@ -336,7 +394,7 @@ func TestTxPoolBeginWithPoolConnectionError_Errno2006_Permanent(t *testing.T) { // After that, vttablet will automatically try to reconnect and this fail. // DBConn.Exec() will return the reconnect error as final error and not the // initial connection error. - _, err = txPool.LocalBegin(context.Background(), &querypb.ExecuteOptions{}) + _, _, err = txPool.LocalBegin(context.Background(), &querypb.ExecuteOptions{}) if err == nil || !strings.Contains(err.Error(), "(errno 2013)") { t.Fatalf("Begin did not return the reconnect error: %v", err) } @@ -362,7 +420,7 @@ func TestTxPoolBeginWithPoolConnectionError_Errno2013(t *testing.T) { db.EnableShouldClose() // 2013 is not retryable. DBConn.Exec() fails after the first attempt. - _, err = txPool.Begin(context.Background(), &querypb.ExecuteOptions{}) + _, _, err = txPool.Begin(context.Background(), &querypb.ExecuteOptions{}) if err == nil || !strings.Contains(err.Error(), "(errno 2013)") { t.Fatalf("Begin must return connection error with MySQL errno 2013: %v", err) } @@ -385,7 +443,7 @@ func primeTxPoolWithConnection(t *testing.T) (*fakesqldb.DB, *TxPool, error) { db.AddQuery("begin", &sqltypes.Result{}) db.AddQuery("rollback", &sqltypes.Result{}) ctx := context.Background() - txConn, err := txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) + txConn, _, err := txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) if err != nil { return nil, nil, err } @@ -402,7 +460,7 @@ func TestTxPoolBeginWithError(t *testing.T) { txPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams()) defer txPool.Close() ctx := context.Background() - _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}) + _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}) want := "error: rejected" if err == nil || !strings.Contains(err.Error(), want) { t.Errorf("Begin: %v, want %s", err, want) @@ -424,7 +482,7 @@ func TestTxPoolRollbackFail(t *testing.T) { txPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams()) defer txPool.Close() ctx := context.Background() - transactionID, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}) + transactionID, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}) if err != nil { t.Fatal(err) } @@ -467,7 +525,7 @@ func TestTxPoolGetConnRecentlyRemovedTransaction(t *testing.T) { db.AddQuery("rollback", &sqltypes.Result{}) txPool := newTxPool() txPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams()) - id, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}) + id, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}) txPool.Close() assertErrorMatch := func(id int64, reason string) { @@ -486,14 +544,14 @@ func TestTxPoolGetConnRecentlyRemovedTransaction(t *testing.T) { txPool = newTxPool() txPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams()) - id, err = txPool.Begin(ctx, &querypb.ExecuteOptions{}) - if err := txPool.Commit(ctx, id, &fakeMessageCommitter{}); err != nil { + id, _, err = txPool.Begin(ctx, &querypb.ExecuteOptions{}) + if _, err := txPool.Commit(ctx, id, &fakeMessageCommitter{}); err != nil { t.Fatalf("got error: %v", err) } assertErrorMatch(id, "transaction committed") - id, err = txPool.Begin(ctx, &querypb.ExecuteOptions{}) + id, _, err = txPool.Begin(ctx, &querypb.ExecuteOptions{}) if err := txPool.Rollback(ctx, id); err != nil { t.Fatalf("got error: %v", err) } @@ -506,13 +564,13 @@ func TestTxPoolGetConnRecentlyRemovedTransaction(t *testing.T) { txPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams()) defer txPool.Close() - id, err = txPool.Begin(ctx, &querypb.ExecuteOptions{}) + id, _, err = txPool.Begin(ctx, &querypb.ExecuteOptions{}) time.Sleep(5 * time.Millisecond) assertErrorMatch(id, "exceeded timeout: 1ms") txPool.SetTimeout(1 * time.Hour) - id, err = txPool.Begin(ctx, &querypb.ExecuteOptions{}) + id, _, err = txPool.Begin(ctx, &querypb.ExecuteOptions{}) txc, err := txPool.Get(id, "for close") if err != nil { t.Fatalf("got error: %v", err) @@ -545,7 +603,7 @@ func TestTxPoolExecFailDueToConnFail_Errno2006(t *testing.T) { ctx := context.Background() // Start the transaction. - txConn, err := txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) + txConn, _, err := txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) if err != nil { t.Fatal(err) } @@ -583,7 +641,7 @@ func TestTxPoolExecFailDueToConnFail_Errno2013(t *testing.T) { ctx := context.Background() // Start the transaction. - txConn, err := txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) + txConn, _, err := txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) if err != nil { t.Fatal(err) } @@ -615,7 +673,7 @@ func TestTxPoolCloseKillsStrayTransactions(t *testing.T) { txPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams()) // Start stray transaction. - _, err := txPool.Begin(context.Background(), &querypb.ExecuteOptions{}) + _, _, err := txPool.Begin(context.Background(), &querypb.ExecuteOptions{}) if err != nil { t.Fatal(err) }