From 6576d18eceb744851ebb566dbaf2ca81bb2f16c2 Mon Sep 17 00:00:00 2001 From: Michael Demmer Date: Fri, 19 Apr 2019 10:30:21 -0700 Subject: [PATCH 1/2] properly suppress begin...commit in autocommit logs The previous (untested) implementation turned out to be in the wrong place in the TabletServer execution tier and did not properly log the actual statements being executed. Implement this the right way by returning the statements that were really executed out from the TxPool, then using those to determine whether or not to log the statement. Signed-off-by: Michael Demmer --- go/vt/vttablet/tabletserver/query_executor.go | 15 +-- go/vt/vttablet/tabletserver/tabletserver.go | 41 +++++-- go/vt/vttablet/tabletserver/tx_engine.go | 14 ++- go/vt/vttablet/tabletserver/tx_engine_test.go | 15 ++- go/vt/vttablet/tabletserver/tx_executor.go | 31 ++--- go/vt/vttablet/tabletserver/tx_pool.go | 51 +++++---- go/vt/vttablet/tabletserver/tx_pool_test.go | 108 ++++++++++++++---- 7 files changed, 184 insertions(+), 91 deletions(-) diff --git a/go/vt/vttablet/tabletserver/query_executor.go b/go/vt/vttablet/tabletserver/query_executor.go index 0a150e32fba..d196d1df9d9 100644 --- a/go/vt/vttablet/tabletserver/query_executor.go +++ b/go/vt/vttablet/tabletserver/query_executor.go @@ -296,16 +296,13 @@ func (qre *QueryExecutor) execDmlAutoCommit() (reply *sqltypes.Result, err error } func (qre *QueryExecutor) execAsTransaction(f func(conn *TxConnection) (*sqltypes.Result, error)) (reply *sqltypes.Result, err error) { - conn, err := qre.tsv.te.txPool.LocalBegin(qre.ctx, qre.options) + conn, beginSQL, err := qre.tsv.te.txPool.LocalBegin(qre.ctx, qre.options) if err != nil { return nil, err } defer qre.tsv.te.txPool.LocalConclude(qre.ctx, conn) - // - // A better future improvement would be for LocalBegin to return the set of - // executed statements to capture the isolation level setting as well. - if qre.options.GetTransactionIsolation() != querypb.ExecuteOptions_AUTOCOMMIT { - qre.logStats.AddRewrittenSQL("begin", time.Now()) + if beginSQL != "" { + qre.logStats.AddRewrittenSQL(beginSQL, time.Now()) } reply, err = f(conn) @@ -316,11 +313,11 @@ func (qre *QueryExecutor) execAsTransaction(f func(conn *TxConnection) (*sqltype qre.logStats.AddRewrittenSQL("rollback", start) return nil, err } - err = qre.tsv.te.txPool.LocalCommit(qre.ctx, conn, qre.tsv.messager) + commitSQL, err := qre.tsv.te.txPool.LocalCommit(qre.ctx, conn, qre.tsv.messager) // As above LocalCommit is a no-op for autocommmit so don't log anything. - if qre.options.GetTransactionIsolation() != querypb.ExecuteOptions_AUTOCOMMIT { - qre.logStats.AddRewrittenSQL("commit", start) + if commitSQL != "" { + qre.logStats.AddRewrittenSQL(commitSQL, start) } if err != nil { diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index 003b27708a0..662bd5c7b66 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -237,12 +237,15 @@ type TxPoolController interface { // StopGently will change the state to NotServing but first wait for transactions to wrap up StopGently() - // Begin begins a transaction, and returns the associated transaction id. + // Begin begins a transaction, and returns the associated transaction id and the + // statement(s) used to execute the begin (if any). + // // Subsequent statements can access the connection through the transaction id. - Begin(ctx context.Context, options *querypb.ExecuteOptions) (int64, error) + Begin(ctx context.Context, options *querypb.ExecuteOptions) (int64, string, error) - // Commit commits the specified transaction. - Commit(ctx context.Context, transactionID int64, mc messageCommitter) error + // Commit commits the specified transaction, returning the statement used to execute + // the commit or "" in autocommit settings. + Commit(ctx context.Context, transactionID int64, mc messageCommitter) (string, error) // Rollback rolls back the specified transaction. Rollback(ctx context.Context, transactionID int64) error @@ -778,8 +781,17 @@ func (tsv *TabletServer) Begin(ctx context.Context, target *querypb.Target, opti // TODO(erez): I think this should be RESOURCE_EXHAUSTED. return vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "Transaction throttled") } - transactionID, err = tsv.teCtrl.Begin(ctx, options) + var beginSQL string + transactionID, beginSQL, err = tsv.teCtrl.Begin(ctx, options) logStats.TransactionID = transactionID + + // Record the actual statements that were executed in the logStats. + // If nothing was actually executed, clear out the Method so that + // handlePanicAndSendLogStats doesn't log the no-op. + logStats.OriginalSQL = beginSQL + if beginSQL == "" { + logStats.Method = "" + } return err }, ) @@ -795,7 +807,17 @@ func (tsv *TabletServer) Commit(ctx context.Context, target *querypb.Target, tra func(ctx context.Context, logStats *tabletenv.LogStats) error { defer tabletenv.QueryStats.Record("COMMIT", time.Now()) logStats.TransactionID = transactionID - return tsv.teCtrl.Commit(ctx, transactionID, tsv.messager) + + var commitSQL string + commitSQL, err = tsv.teCtrl.Commit(ctx, transactionID, tsv.messager) + + // If nothing was actually executed, clear out the Method so that + // handlePanicAndSendLogStats doesn't log the no-op. + if commitSQL == "" { + logStats.Method = "" + } + + return err }, ) } @@ -1471,6 +1493,7 @@ func (tsv *TabletServer) handlePanicAndSendLogStats( // Examples where we don't send the log stats: // - ExecuteBatch() (logStats == nil) // - beginWaitForSameRangeTransactions() (Method == "") + // - Begin / Commit in autocommit mode if logStats != nil && logStats.Method != "" { logStats.Send() } @@ -1860,9 +1883,9 @@ func (tsv *TabletServer) BroadcastHealth(terTimestamp int64, stats *querypb.Real target := tsv.target tsv.mu.Unlock() shr := &querypb.StreamHealthResponse{ - Target: &target, - TabletAlias: &tsv.alias, - Serving: tsv.IsServing(), + Target: &target, + TabletAlias: &tsv.alias, + Serving: tsv.IsServing(), TabletExternallyReparentedTimestamp: terTimestamp, RealtimeStats: stats, } diff --git a/go/vt/vttablet/tabletserver/tx_engine.go b/go/vt/vttablet/tabletserver/tx_engine.go index cb928f9a934..9db9a56ef5e 100644 --- a/go/vt/vttablet/tabletserver/tx_engine.go +++ b/go/vt/vttablet/tabletserver/tx_engine.go @@ -264,22 +264,24 @@ func (te *TxEngine) AcceptReadOnly() error { } } -// Begin begins a transaction, and returns the associated transaction id. +// Begin begins a transaction, and returns the associated transaction id and the +// statement(s) used to execute the begin (if any). +// // Subsequent statements can access the connection through the transaction id. -func (te *TxEngine) Begin(ctx context.Context, options *querypb.ExecuteOptions) (int64, error) { +func (te *TxEngine) Begin(ctx context.Context, options *querypb.ExecuteOptions) (int64, string, error) { te.stateLock.Lock() canOpenTransactions := te.state == AcceptingReadOnly || te.state == AcceptingReadAndWrite if !canOpenTransactions { // We are not in a state where we can start new transactions. Abort. te.stateLock.Unlock() - return 0, vterrors.Errorf(vtrpc.Code_UNAVAILABLE, "tx engine can't accept new transactions in state %v", te.state) + return 0, "", vterrors.Errorf(vtrpc.Code_UNAVAILABLE, "tx engine can't accept new transactions in state %v", te.state) } isWriteTransaction := options == nil || options.TransactionIsolation != querypb.ExecuteOptions_CONSISTENT_SNAPSHOT_READ_ONLY if te.state == AcceptingReadOnly && isWriteTransaction { te.stateLock.Unlock() - return 0, vterrors.Errorf(vtrpc.Code_UNAVAILABLE, "tx engine can only accept read-only transactions in current state") + return 0, "", vterrors.Errorf(vtrpc.Code_UNAVAILABLE, "tx engine can only accept read-only transactions in current state") } // By Add() to beginRequests, we block others from initiating state @@ -292,7 +294,7 @@ func (te *TxEngine) Begin(ctx context.Context, options *querypb.ExecuteOptions) } // Commit commits the specified transaction. -func (te *TxEngine) Commit(ctx context.Context, transactionID int64, mc messageCommitter) error { +func (te *TxEngine) Commit(ctx context.Context, transactionID int64, mc messageCommitter) (string, error) { return te.txPool.Commit(ctx, transactionID, mc) } @@ -466,7 +468,7 @@ outer: if txid > maxid { maxid = txid } - conn, err := te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) + conn, _, err := te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) if err != nil { allErr.RecordError(err) continue diff --git a/go/vt/vttablet/tabletserver/tx_engine_test.go b/go/vt/vttablet/tabletserver/tx_engine_test.go index cb7c235e41c..3c209a49bf4 100644 --- a/go/vt/vttablet/tabletserver/tx_engine_test.go +++ b/go/vt/vttablet/tabletserver/tx_engine_test.go @@ -55,10 +55,13 @@ func TestTxEngineClose(t *testing.T) { // Normal close with timeout wait. te.open() - c, err := te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) + c, beginSQL, err := te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) if err != nil { t.Fatal(err) } + if beginSQL != "begin" { + t.Errorf("beginSQL: %q, want 'begin'", beginSQL) + } c.Recycle() start = time.Now() te.close(false) @@ -68,7 +71,7 @@ func TestTxEngineClose(t *testing.T) { // Immediate close. te.open() - c, err = te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) + c, beginSQL, err = te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) if err != nil { t.Fatal(err) } @@ -82,7 +85,7 @@ func TestTxEngineClose(t *testing.T) { // Normal close with short grace period. te.shutdownGracePeriod = 250 * time.Millisecond te.open() - c, err = te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) + c, beginSQL, err = te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) if err != nil { t.Fatal(err) } @@ -99,7 +102,7 @@ func TestTxEngineClose(t *testing.T) { // Normal close with short grace period, but pool gets empty early. te.shutdownGracePeriod = 250 * time.Millisecond te.open() - c, err = te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) + c, beginSQL, err = te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) if err != nil { t.Fatal(err) } @@ -123,7 +126,7 @@ func TestTxEngineClose(t *testing.T) { // Immediate close, but connection is in use. te.open() - c, err = te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) + c, beginSQL, err = te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) if err != nil { t.Fatal(err) } @@ -492,6 +495,6 @@ func startTransaction(te *TxEngine, writeTransaction bool) error { } else { options.TransactionIsolation = querypb.ExecuteOptions_CONSISTENT_SNAPSHOT_READ_ONLY } - _, err := te.Begin(context.Background(), options) + _, _, err := te.Begin(context.Background(), options) return err } diff --git a/go/vt/vttablet/tabletserver/tx_executor.go b/go/vt/vttablet/tabletserver/tx_executor.go index f9d00174c40..e485174745f 100644 --- a/go/vt/vttablet/tabletserver/tx_executor.go +++ b/go/vt/vttablet/tabletserver/tx_executor.go @@ -68,7 +68,7 @@ func (txe *TxExecutor) Prepare(transactionID int64, dtid string) error { return vterrors.Errorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, "prepare failed for transaction %d: %v", transactionID, err) } - localConn, err := txe.te.txPool.LocalBegin(txe.ctx, &querypb.ExecuteOptions{}) + localConn, _, err := txe.te.txPool.LocalBegin(txe.ctx, &querypb.ExecuteOptions{}) if err != nil { return err } @@ -79,7 +79,7 @@ func (txe *TxExecutor) Prepare(transactionID int64, dtid string) error { return err } - err = txe.te.txPool.LocalCommit(txe.ctx, localConn, txe.messager) + _, err = txe.te.txPool.LocalCommit(txe.ctx, localConn, txe.messager) if err != nil { return err } @@ -111,7 +111,7 @@ func (txe *TxExecutor) CommitPrepared(dtid string) error { txe.markFailed(ctx, dtid) return err } - err = txe.te.txPool.LocalCommit(ctx, conn, txe.messager) + _, err = txe.te.txPool.LocalCommit(ctx, conn, txe.messager) if err != nil { txe.markFailed(ctx, dtid) return err @@ -130,7 +130,7 @@ func (txe *TxExecutor) CommitPrepared(dtid string) error { func (txe *TxExecutor) markFailed(ctx context.Context, dtid string) { tabletenv.InternalErrors.Add("TwopcCommit", 1) txe.te.preparedPool.SetFailed(dtid) - conn, err := txe.te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) + conn, _, err := txe.te.txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) if err != nil { log.Errorf("markFailed: Begin failed for dtid %s: %v", dtid, err) return @@ -142,7 +142,7 @@ func (txe *TxExecutor) markFailed(ctx context.Context, dtid string) { return } - if err = txe.te.txPool.LocalCommit(ctx, conn, txe.messager); err != nil { + if _, err = txe.te.txPool.LocalCommit(ctx, conn, txe.messager); err != nil { log.Errorf("markFailed: Commit failed for dtid %s: %v", dtid, err) } } @@ -170,7 +170,7 @@ func (txe *TxExecutor) RollbackPrepared(dtid string, originalID int64) error { return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled") } defer tabletenv.QueryStats.Record("ROLLBACK_PREPARED", time.Now()) - conn, err := txe.te.txPool.LocalBegin(txe.ctx, &querypb.ExecuteOptions{}) + conn, _, err := txe.te.txPool.LocalBegin(txe.ctx, &querypb.ExecuteOptions{}) if err != nil { goto returnConn } @@ -181,7 +181,7 @@ func (txe *TxExecutor) RollbackPrepared(dtid string, originalID int64) error { goto returnConn } - err = txe.te.txPool.LocalCommit(txe.ctx, conn, txe.messager) + _, err = txe.te.txPool.LocalCommit(txe.ctx, conn, txe.messager) returnConn: if preparedConn := txe.te.preparedPool.FetchForRollback(dtid); preparedConn != nil { @@ -200,7 +200,7 @@ func (txe *TxExecutor) CreateTransaction(dtid string, participants []*querypb.Ta return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled") } defer tabletenv.QueryStats.Record("CREATE_TRANSACTION", time.Now()) - conn, err := txe.te.txPool.LocalBegin(txe.ctx, &querypb.ExecuteOptions{}) + conn, _, err := txe.te.txPool.LocalBegin(txe.ctx, &querypb.ExecuteOptions{}) if err != nil { return err } @@ -210,7 +210,8 @@ func (txe *TxExecutor) CreateTransaction(dtid string, participants []*querypb.Ta if err != nil { return err } - return txe.te.txPool.LocalCommit(txe.ctx, conn, txe.messager) + _, err = txe.te.txPool.LocalCommit(txe.ctx, conn, txe.messager) + return err } // StartCommit atomically commits the transaction along with the @@ -232,7 +233,8 @@ func (txe *TxExecutor) StartCommit(transactionID int64, dtid string) error { if err != nil { return err } - return txe.te.txPool.LocalCommit(txe.ctx, conn, txe.messager) + _, err = txe.te.txPool.LocalCommit(txe.ctx, conn, txe.messager) + return err } // SetRollback transitions the 2pc transaction to the Rollback state. @@ -248,7 +250,7 @@ func (txe *TxExecutor) SetRollback(dtid string, transactionID int64) error { txe.te.txPool.Rollback(txe.ctx, transactionID) } - conn, err := txe.te.txPool.LocalBegin(txe.ctx, &querypb.ExecuteOptions{}) + conn, _, err := txe.te.txPool.LocalBegin(txe.ctx, &querypb.ExecuteOptions{}) if err != nil { return err } @@ -259,7 +261,7 @@ func (txe *TxExecutor) SetRollback(dtid string, transactionID int64) error { return err } - err = txe.te.txPool.LocalCommit(txe.ctx, conn, txe.messager) + _, err = txe.te.txPool.LocalCommit(txe.ctx, conn, txe.messager) if err != nil { return err } @@ -275,7 +277,7 @@ func (txe *TxExecutor) ConcludeTransaction(dtid string) error { } defer tabletenv.QueryStats.Record("RESOLVE", time.Now()) - conn, err := txe.te.txPool.LocalBegin(txe.ctx, &querypb.ExecuteOptions{}) + conn, _, err := txe.te.txPool.LocalBegin(txe.ctx, &querypb.ExecuteOptions{}) if err != nil { return err } @@ -285,7 +287,8 @@ func (txe *TxExecutor) ConcludeTransaction(dtid string) error { if err != nil { return err } - return txe.te.txPool.LocalCommit(txe.ctx, conn, txe.messager) + _, err = txe.te.txPool.LocalCommit(txe.ctx, conn, txe.messager) + return err } // ReadTransaction returns the metadata for the sepcified dtid. diff --git a/go/vt/vttablet/tabletserver/tx_pool.go b/go/vt/vttablet/tabletserver/tx_pool.go index 4d766643792..5f068fba724 100644 --- a/go/vt/vttablet/tabletserver/tx_pool.go +++ b/go/vt/vttablet/tabletserver/tx_pool.go @@ -192,23 +192,26 @@ func (axp *TxPool) WaitForEmpty() { axp.activePool.WaitForEmpty() } -// Begin begins a transaction, and returns the associated transaction id. +// Begin begins a transaction, and returns the associated transaction id and +// the statements (if any) executed to initiate the transaction. In autocommit +// mode the statement will be "". +// // Subsequent statements can access the connection through the transaction id. -func (axp *TxPool) Begin(ctx context.Context, options *querypb.ExecuteOptions) (int64, error) { +func (axp *TxPool) Begin(ctx context.Context, options *querypb.ExecuteOptions) (int64, string, error) { var conn *connpool.DBConn var err error immediateCaller := callerid.ImmediateCallerIDFromContext(ctx) effectiveCaller := callerid.EffectiveCallerIDFromContext(ctx) if !axp.limiter.Get(immediateCaller, effectiveCaller) { - return 0, vterrors.Errorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, "per-user transaction pool connection limit exceeded") + return 0, "", vterrors.Errorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, "per-user transaction pool connection limit exceeded") } waiterCount := axp.waiters.Add(1) defer axp.waiters.Add(-1) if waiterCount > axp.waiterCap.Get() { - return 0, vterrors.New(vtrpcpb.Code_RESOURCE_EXHAUSTED, "transaction pool waiter count exceeded") + return 0, "", vterrors.New(vtrpcpb.Code_RESOURCE_EXHAUSTED, "transaction pool waiter count exceeded") } var beginSucceeded bool @@ -231,30 +234,33 @@ func (axp *TxPool) Begin(ctx context.Context, options *querypb.ExecuteOptions) ( if err != nil { switch err { case connpool.ErrConnPoolClosed: - return 0, err + return 0, "", err case pools.ErrTimeout: axp.LogActive() - return 0, vterrors.Errorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, "transaction pool connection limit exceeded") + return 0, "", vterrors.Errorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, "transaction pool connection limit exceeded") } - return 0, err + return 0, "", err } autocommitTransaction := false - + beginQueries := "" if queries, ok := txIsolations[options.GetTransactionIsolation()]; ok { if queries.setIsolationLevel != "" { if _, err := conn.Exec(ctx, "set transaction isolation level "+queries.setIsolationLevel, 1, false); err != nil { - return 0, err + return 0, "", err } + + beginQueries = queries.setIsolationLevel + "; " } if _, err := conn.Exec(ctx, queries.openTransaction, 1, false); err != nil { - return 0, err + return 0, "", err } + beginQueries = beginQueries + queries.openTransaction } else if options.GetTransactionIsolation() == querypb.ExecuteOptions_AUTOCOMMIT { autocommitTransaction = true } else { - return 0, fmt.Errorf("don't know how to open a transaction of this type: %v", options.GetTransactionIsolation()) + return 0, "", fmt.Errorf("don't know how to open a transaction of this type: %v", options.GetTransactionIsolation()) } beginSucceeded = true @@ -271,14 +277,14 @@ func (axp *TxPool) Begin(ctx context.Context, options *querypb.ExecuteOptions) ( ), options.GetWorkload() != querypb.ExecuteOptions_DBA, ) - return transactionID, nil + return transactionID, beginQueries, nil } // Commit commits the specified transaction. -func (axp *TxPool) Commit(ctx context.Context, transactionID int64, mc messageCommitter) error { +func (axp *TxPool) Commit(ctx context.Context, transactionID int64, mc messageCommitter) (string, error) { conn, err := axp.Get(transactionID, "for commit") if err != nil { - return err + return "", err } return axp.LocalCommit(ctx, conn, mc) } @@ -305,30 +311,31 @@ func (axp *TxPool) Get(transactionID int64, reason string) (*TxConnection, error // LocalBegin is equivalent to Begin->Get. // It's used for executing transactions within a request. It's safe // to always call LocalConclude at the end. -func (axp *TxPool) LocalBegin(ctx context.Context, options *querypb.ExecuteOptions) (*TxConnection, error) { - transactionID, err := axp.Begin(ctx, options) +func (axp *TxPool) LocalBegin(ctx context.Context, options *querypb.ExecuteOptions) (*TxConnection, string, error) { + transactionID, beginSQL, err := axp.Begin(ctx, options) if err != nil { - return nil, err + return nil, "", err } - return axp.Get(transactionID, "for local query") + conn, err := axp.Get(transactionID, "for local query") + return conn, beginSQL, err } // LocalCommit is the commit function for LocalBegin. -func (axp *TxPool) LocalCommit(ctx context.Context, conn *TxConnection, mc messageCommitter) error { +func (axp *TxPool) LocalCommit(ctx context.Context, conn *TxConnection, mc messageCommitter) (string, error) { defer conn.conclude(TxCommit, "transaction committed") defer mc.LockDB(conn.NewMessages, conn.ChangedMessages)() if conn.Autocommit { mc.UpdateCaches(conn.NewMessages, conn.ChangedMessages) - return nil + return "", nil } if _, err := conn.Exec(ctx, "commit", 1, false); err != nil { conn.Close() - return err + return "", err } mc.UpdateCaches(conn.NewMessages, conn.ChangedMessages) - return nil + return "commit", nil } // LocalConclude concludes a transaction started by LocalBegin. diff --git a/go/vt/vttablet/tabletserver/tx_pool_test.go b/go/vt/vttablet/tabletserver/tx_pool_test.go index d58b0357319..b1d54686b1d 100644 --- a/go/vt/vttablet/tabletserver/tx_pool_test.go +++ b/go/vt/vttablet/tabletserver/tx_pool_test.go @@ -40,6 +40,42 @@ import ( "vitess.io/vitess/go/vt/vttablet/tabletserver/messager" ) +func TestTxPoolExecuteCommit(t *testing.T) { + sql := "update test_column set x=1 where 1!=1" + db := fakesqldb.New(t) + defer db.Close() + db.AddQuery(sql, &sqltypes.Result{}) + db.AddQuery("begin", &sqltypes.Result{}) + db.AddQuery("commit", &sqltypes.Result{}) + + txPool := newTxPool() + txPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams()) + defer txPool.Close() + ctx := context.Background() + transactionID, beginSQL, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}) + if err != nil { + t.Fatal(err) + } + if beginSQL != "begin" { + t.Errorf("beginSQL got %q want 'begin'", beginSQL) + } + txConn, err := txPool.Get(transactionID, "for query") + if err != nil { + t.Fatal(err) + } + txConn.RecordQuery(sql) + _, err = txConn.Exec(ctx, sql, 1, true) + txConn.Recycle() + + commitSQL, err := txPool.Commit(ctx, transactionID, &fakeMessageCommitter{}) + if err != nil { + t.Fatal(err) + } + if commitSQL != "commit" { + t.Errorf("commitSQL got %q want 'commit'", commitSQL) + } +} + func TestTxPoolExecuteRollback(t *testing.T) { sql := "alter table test_table add test_column int" db := fakesqldb.New(t) @@ -52,10 +88,13 @@ func TestTxPoolExecuteRollback(t *testing.T) { txPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams()) defer txPool.Close() ctx := context.Background() - transactionID, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}) + transactionID, beginSQL, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}) if err != nil { t.Fatal(err) } + if beginSQL != "begin" { + t.Errorf("beginSQL got %q want 'begin'", beginSQL) + } txConn, err := txPool.Get(transactionID, "for query") if err != nil { t.Fatal(err) @@ -79,11 +118,11 @@ func TestTxPoolRollbackNonBusy(t *testing.T) { txPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams()) defer txPool.Close() ctx := context.Background() - txid1, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}) + txid1, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}) if err != nil { t.Fatal(err) } - _, err = txPool.Begin(ctx, &querypb.ExecuteOptions{}) + _, _, err = txPool.Begin(ctx, &querypb.ExecuteOptions{}) if err != nil { t.Fatal(err) } @@ -173,7 +212,7 @@ func TestTxPoolTransactionKillerEnforceTimeoutEnabled(t *testing.T) { } func addQuery(ctx context.Context, sql string, txPool *TxPool, workload querypb.ExecuteOptions_Workload) (int64, error) { - transactionID, err := txPool.Begin(ctx, &querypb.ExecuteOptions{Workload: workload}) + transactionID, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{Workload: workload}) if err != nil { return 0, err } @@ -199,10 +238,13 @@ func TestTxPoolClientRowsFound(t *testing.T) { // Start a 'normal' transaction. It should take a connection // for the normal 'conns' pool. - id1, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}) + id1, beginSQL, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}) if err != nil { t.Fatal(err) } + if beginSQL != "begin" { + t.Errorf("beginSQL got %q want 'begin'", beginSQL) + } if got, want := txPool.conns.Available(), startNormalSize-1; got != want { t.Errorf("Normal pool size: %d, want %d", got, want) } @@ -212,10 +254,13 @@ func TestTxPoolClientRowsFound(t *testing.T) { // Start a 'foundRows' transaction. It should take a connection // from the foundRows pool. - id2, err := txPool.Begin(ctx, &querypb.ExecuteOptions{ClientFoundRows: true}) + id2, beginSQL, err := txPool.Begin(ctx, &querypb.ExecuteOptions{ClientFoundRows: true}) if err != nil { t.Fatal(err) } + if beginSQL != "begin" { + t.Errorf("beginSQL got %q want 'begin'", beginSQL) + } if got, want := txPool.conns.Available(), startNormalSize-1; got != want { t.Errorf("Normal pool size: %d, want %d", got, want) } @@ -253,16 +298,23 @@ func TestTxPoolTransactionIsolation(t *testing.T) { ctx := context.Background() // Start a transaction with default. It should not change isolation. - _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}) + _, beginSQL, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}) if err != nil { t.Fatal(err) } + if beginSQL != "begin" { + t.Errorf("beginSQL got %q want 'begin'", beginSQL) + } db.AddQuery("set transaction isolation level READ COMMITTED", &sqltypes.Result{}) - _, err = txPool.Begin(ctx, &querypb.ExecuteOptions{TransactionIsolation: querypb.ExecuteOptions_READ_COMMITTED}) + _, beginSQL, err = txPool.Begin(ctx, &querypb.ExecuteOptions{TransactionIsolation: querypb.ExecuteOptions_READ_COMMITTED}) if err != nil { t.Fatal(err) } + wantBeginSQL := "READ COMMITTED; begin" + if beginSQL != wantBeginSQL { + t.Errorf("beginSQL got %q want %q", beginSQL, wantBeginSQL) + } } func TestTxPoolAutocommit(t *testing.T) { @@ -276,14 +328,20 @@ func TestTxPoolAutocommit(t *testing.T) { // to mysql. // This test is meaningful because if txPool.Begin were to send a BEGIN statement to the connection, it will fatal // because is not in the list of expected queries (i.e db.AddQuery hasn't been called). - txid, err := txPool.Begin(ctx, &querypb.ExecuteOptions{TransactionIsolation: querypb.ExecuteOptions_AUTOCOMMIT}) + txid, beginSQL, err := txPool.Begin(ctx, &querypb.ExecuteOptions{TransactionIsolation: querypb.ExecuteOptions_AUTOCOMMIT}) if err != nil { t.Fatal(err) } - err = txPool.Commit(ctx, txid, &fakeMessageCommitter{}) + if beginSQL != "" { + t.Errorf("beginSQL got %q want ''", beginSQL) + } + commitSQL, err := txPool.Commit(ctx, txid, &fakeMessageCommitter{}) if err != nil { t.Fatal(err) } + if commitSQL != "" { + t.Errorf("commitSQL got %q want ''", commitSQL) + } } // TestTxPoolBeginWithPoolConnectionError_TransientErrno2006 tests the case @@ -305,7 +363,7 @@ func TestTxPoolBeginWithPoolConnectionError_Errno2006_Transient(t *testing.T) { } ctx := context.Background() - txConn, err := txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) + txConn, _, err := txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) if err != nil { t.Fatalf("Begin should have succeeded after the retry in DBConn.Exec(): %v", err) } @@ -336,7 +394,7 @@ func TestTxPoolBeginWithPoolConnectionError_Errno2006_Permanent(t *testing.T) { // After that, vttablet will automatically try to reconnect and this fail. // DBConn.Exec() will return the reconnect error as final error and not the // initial connection error. - _, err = txPool.LocalBegin(context.Background(), &querypb.ExecuteOptions{}) + _, _, err = txPool.LocalBegin(context.Background(), &querypb.ExecuteOptions{}) if err == nil || !strings.Contains(err.Error(), "(errno 2013)") { t.Fatalf("Begin did not return the reconnect error: %v", err) } @@ -362,7 +420,7 @@ func TestTxPoolBeginWithPoolConnectionError_Errno2013(t *testing.T) { db.EnableShouldClose() // 2013 is not retryable. DBConn.Exec() fails after the first attempt. - _, err = txPool.Begin(context.Background(), &querypb.ExecuteOptions{}) + _, _, err = txPool.Begin(context.Background(), &querypb.ExecuteOptions{}) if err == nil || !strings.Contains(err.Error(), "(errno 2013)") { t.Fatalf("Begin must return connection error with MySQL errno 2013: %v", err) } @@ -385,7 +443,7 @@ func primeTxPoolWithConnection(t *testing.T) (*fakesqldb.DB, *TxPool, error) { db.AddQuery("begin", &sqltypes.Result{}) db.AddQuery("rollback", &sqltypes.Result{}) ctx := context.Background() - txConn, err := txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) + txConn, _, err := txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) if err != nil { return nil, nil, err } @@ -402,7 +460,7 @@ func TestTxPoolBeginWithError(t *testing.T) { txPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams()) defer txPool.Close() ctx := context.Background() - _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}) + _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}) want := "error: rejected" if err == nil || !strings.Contains(err.Error(), want) { t.Errorf("Begin: %v, want %s", err, want) @@ -424,7 +482,7 @@ func TestTxPoolRollbackFail(t *testing.T) { txPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams()) defer txPool.Close() ctx := context.Background() - transactionID, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}) + transactionID, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}) if err != nil { t.Fatal(err) } @@ -467,7 +525,7 @@ func TestTxPoolGetConnRecentlyRemovedTransaction(t *testing.T) { db.AddQuery("rollback", &sqltypes.Result{}) txPool := newTxPool() txPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams()) - id, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}) + id, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}) txPool.Close() assertErrorMatch := func(id int64, reason string) { @@ -486,14 +544,14 @@ func TestTxPoolGetConnRecentlyRemovedTransaction(t *testing.T) { txPool = newTxPool() txPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams()) - id, err = txPool.Begin(ctx, &querypb.ExecuteOptions{}) - if err := txPool.Commit(ctx, id, &fakeMessageCommitter{}); err != nil { + id, _, err = txPool.Begin(ctx, &querypb.ExecuteOptions{}) + if _, err := txPool.Commit(ctx, id, &fakeMessageCommitter{}); err != nil { t.Fatalf("got error: %v", err) } assertErrorMatch(id, "transaction committed") - id, err = txPool.Begin(ctx, &querypb.ExecuteOptions{}) + id, _, err = txPool.Begin(ctx, &querypb.ExecuteOptions{}) if err := txPool.Rollback(ctx, id); err != nil { t.Fatalf("got error: %v", err) } @@ -506,13 +564,13 @@ func TestTxPoolGetConnRecentlyRemovedTransaction(t *testing.T) { txPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams()) defer txPool.Close() - id, err = txPool.Begin(ctx, &querypb.ExecuteOptions{}) + id, _, err = txPool.Begin(ctx, &querypb.ExecuteOptions{}) time.Sleep(5 * time.Millisecond) assertErrorMatch(id, "exceeded timeout: 1ms") txPool.SetTimeout(1 * time.Hour) - id, err = txPool.Begin(ctx, &querypb.ExecuteOptions{}) + id, _, err = txPool.Begin(ctx, &querypb.ExecuteOptions{}) txc, err := txPool.Get(id, "for close") if err != nil { t.Fatalf("got error: %v", err) @@ -545,7 +603,7 @@ func TestTxPoolExecFailDueToConnFail_Errno2006(t *testing.T) { ctx := context.Background() // Start the transaction. - txConn, err := txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) + txConn, _, err := txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) if err != nil { t.Fatal(err) } @@ -583,7 +641,7 @@ func TestTxPoolExecFailDueToConnFail_Errno2013(t *testing.T) { ctx := context.Background() // Start the transaction. - txConn, err := txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) + txConn, _, err := txPool.LocalBegin(ctx, &querypb.ExecuteOptions{}) if err != nil { t.Fatal(err) } @@ -615,7 +673,7 @@ func TestTxPoolCloseKillsStrayTransactions(t *testing.T) { txPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams()) // Start stray transaction. - _, err := txPool.Begin(context.Background(), &querypb.ExecuteOptions{}) + _, _, err := txPool.Begin(context.Background(), &querypb.ExecuteOptions{}) if err != nil { t.Fatal(err) } From 575395cdf5c1ff429e82aef7794e05a97b965c20 Mon Sep 17 00:00:00 2001 From: Michael Demmer Date: Fri, 19 Apr 2019 14:04:49 -0700 Subject: [PATCH 2/2] remove misleading BEGIN / COMMIT QueryStats entries as well Signed-off-by: Michael Demmer --- go/vt/vttablet/tabletserver/tabletserver.go | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index 662bd5c7b66..8866b6e456f 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -776,7 +776,7 @@ func (tsv *TabletServer) Begin(ctx context.Context, target *querypb.Target, opti "Begin", "begin", nil, target, options, true /* isBegin */, false, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { - defer tabletenv.QueryStats.Record("BEGIN", time.Now()) + startTime := time.Now() if tsv.txThrottler.Throttle() { // TODO(erez): I think this should be RESOURCE_EXHAUSTED. return vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "Transaction throttled") @@ -786,10 +786,13 @@ func (tsv *TabletServer) Begin(ctx context.Context, target *querypb.Target, opti logStats.TransactionID = transactionID // Record the actual statements that were executed in the logStats. - // If nothing was actually executed, clear out the Method so that + // If nothing was actually executed, don't count the operation in + // the tablet metrics, and clear out the logStats Method so that // handlePanicAndSendLogStats doesn't log the no-op. logStats.OriginalSQL = beginSQL - if beginSQL == "" { + if beginSQL != "" { + tabletenv.QueryStats.Record("BEGIN", startTime) + } else { logStats.Method = "" } return err @@ -805,18 +808,20 @@ func (tsv *TabletServer) Commit(ctx context.Context, target *querypb.Target, tra "Commit", "commit", nil, target, nil, false /* isBegin */, true, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { - defer tabletenv.QueryStats.Record("COMMIT", time.Now()) + startTime := time.Now() logStats.TransactionID = transactionID var commitSQL string commitSQL, err = tsv.teCtrl.Commit(ctx, transactionID, tsv.messager) - // If nothing was actually executed, clear out the Method so that + // If nothing was actually executed, don't count the operation in + // the tablet metrics, and clear out the logStats Method so that // handlePanicAndSendLogStats doesn't log the no-op. - if commitSQL == "" { + if commitSQL != "" { + tabletenv.QueryStats.Record("COMMIT", startTime) + } else { logStats.Method = "" } - return err }, )