Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
421 changes: 234 additions & 187 deletions go/vt/proto/query/query.pb.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func (qre *QueryExecutor) execDmlAutoCommit() (reply *sqltypes.Result, err error
}

func (qre *QueryExecutor) execAsTransaction(f func(conn *TxConnection) (*sqltypes.Result, error)) (reply *sqltypes.Result, err error) {
conn, err := qre.tsv.te.txPool.LocalBegin(qre.ctx, qre.options.GetClientFoundRows())
conn, err := qre.tsv.te.txPool.LocalBegin(qre.ctx, qre.options.GetClientFoundRows(), qre.options.GetTransactionIsolation())
if err != nil {
return nil, err
}
Expand Down
106 changes: 81 additions & 25 deletions go/vt/vttablet/tabletserver/query_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestQueryExecutorPlanPassDmlRBR(t *testing.T) {
// RBR mode
tsv := newTestTabletServer(ctx, noFlags, db)
defer tsv.StopService()
txid := newTransaction(tsv)
txid := newTransaction(tsv, nil)
qre := newTestQueryExecutor(ctx, tsv, query, txid)
tsv.qe.binlogFormat = connpool.BinlogFormatRow
checkPlanID(t, planbuilder.PlanPassDML, qre.plan.PlanID)
Expand Down Expand Up @@ -144,7 +144,7 @@ func TestQueryExecutorPlanPassDmlReplaceInto(t *testing.T) {
// RBR mode
tsv := newTestTabletServer(ctx, noFlags, db)
defer tsv.StopService()
txid := newTransaction(tsv)
txid := newTransaction(tsv, nil)
qre := newTestQueryExecutor(ctx, tsv, query, txid)
tsv.qe.binlogFormat = connpool.BinlogFormatRow
checkPlanID(t, planbuilder.PlanPassDML, qre.plan.PlanID)
Expand Down Expand Up @@ -251,7 +251,7 @@ func TestQueryExecutorPlanInsertMessage(t *testing.T) {
t.Errorf("rows:\n%+v, want\n%+v", mr, wantqr)
}

txid := newTransaction(tsv)
txid := newTransaction(tsv, nil)
qre = newTestQueryExecutor(ctx, tsv, query, txid)
defer testCommitHelper(t, tsv, qre)
got, err = qre.Execute()
Expand Down Expand Up @@ -357,7 +357,7 @@ func TestQueryExecutorPlanInsertSubQuery(t *testing.T) {
db.AddQuery(insertQuery, &sqltypes.Result{})
ctx := context.Background()
tsv := newTestTabletServer(ctx, noFlags, db)
txid := newTransaction(tsv)
txid := newTransaction(tsv, nil)
qre := newTestQueryExecutor(ctx, tsv, query, txid)

defer tsv.StopService()
Expand Down Expand Up @@ -402,7 +402,7 @@ func TestQueryExecutorPlanInsertSubQueryRBR(t *testing.T) {
db.AddQuery(insertQuery, &sqltypes.Result{})
ctx := context.Background()
tsv := newTestTabletServer(ctx, noFlags, db)
txid := newTransaction(tsv)
txid := newTransaction(tsv, nil)
qre := newTestQueryExecutor(ctx, tsv, query, txid)
tsv.qe.binlogFormat = connpool.BinlogFormatRow

Expand Down Expand Up @@ -431,7 +431,7 @@ func TestQueryExecutorPlanUpsertPk(t *testing.T) {
query := "insert into test_table(pk) values(1) on duplicate key update val=1"
ctx := context.Background()
tsv := newTestTabletServer(ctx, noFlags, db)
txid := newTransaction(tsv)
txid := newTransaction(tsv, nil)
qre := newTestQueryExecutor(ctx, tsv, query, txid)
defer tsv.StopService()
defer testCommitHelper(t, tsv, qre)
Expand All @@ -450,7 +450,7 @@ func TestQueryExecutorPlanUpsertPk(t *testing.T) {
}

db.AddRejectedQuery("insert into test_table(pk) values (1) /* _stream test_table (pk ) (1 ); */", errRejected)
txid = newTransaction(tsv)
txid = newTransaction(tsv, nil)
qre = newTestQueryExecutor(ctx, tsv, query, txid)
defer testCommitHelper(t, tsv, qre)
_, err = qre.Execute()
Expand All @@ -467,7 +467,7 @@ func TestQueryExecutorPlanUpsertPk(t *testing.T) {
mysql.NewSQLError(mysql.ERDupEntry, mysql.SSDupKey, "err"),
)
db.AddQuery("update test_table(pk) set val = 1 where pk in (1) /* _stream test_table (pk ) (1 ); */", &sqltypes.Result{})
txid = newTransaction(tsv)
txid = newTransaction(tsv, nil)
qre = newTestQueryExecutor(ctx, tsv, query, txid)
defer testCommitHelper(t, tsv, qre)
_, err = qre.Execute()
Expand All @@ -488,7 +488,7 @@ func TestQueryExecutorPlanUpsertPk(t *testing.T) {
"update test_table set val = 1 where pk in (1) /* _stream test_table (pk ) (1 ); */",
&sqltypes.Result{RowsAffected: 1},
)
txid = newTransaction(tsv)
txid = newTransaction(tsv, nil)
qre = newTestQueryExecutor(ctx, tsv, query, txid)
defer testCommitHelper(t, tsv, qre)
got, err = qre.Execute()
Expand Down Expand Up @@ -516,7 +516,7 @@ func TestQueryExecutorPlanUpsertPk(t *testing.T) {
"update test_table set pk = 2 where pk in (1) /* _stream test_table (pk ) (1 ) (2 ); */",
&sqltypes.Result{RowsAffected: 1},
)
txid = newTransaction(tsv)
txid = newTransaction(tsv, nil)
qre = newTestQueryExecutor(ctx, tsv, "insert into test_table(pk) values (1) on duplicate key update pk=2", txid)
defer testCommitHelper(t, tsv, qre)
got, err = qre.Execute()
Expand Down Expand Up @@ -545,7 +545,7 @@ func TestQueryExecutorPlanUpsertPkSingleUnique(t *testing.T) {
want := &sqltypes.Result{}
ctx := context.Background()
tsv := newTestTabletServer(ctx, noFlags, db)
txid := newTransaction(tsv)
txid := newTransaction(tsv, nil)
qre := newTestQueryExecutor(ctx, tsv, query[0:strings.Index(query, " /*")], txid)
defer tsv.StopService()
defer testCommitHelper(t, tsv, qre)
Expand All @@ -569,7 +569,7 @@ func TestQueryExecutorPlanUpsertPkSingleUnique(t *testing.T) {
want = &sqltypes.Result{}
ctx = context.Background()
tsv = newTestTabletServer(ctx, noFlags, db)
txid = newTransaction(tsv)
txid = newTransaction(tsv, nil)
qre = newTestQueryExecutor(ctx, tsv, query[0:strings.Index(query, " /*")], txid)
defer testCommitHelper(t, tsv, qre)
checkPlanID(t, planbuilder.PlanInsertPK, qre.plan.PlanID)
Expand All @@ -592,7 +592,7 @@ func TestQueryExecutorPlanUpsertPkSingleUnique(t *testing.T) {
want = &sqltypes.Result{}
ctx = context.Background()
tsv = newTestTabletServer(ctx, noFlags, db)
txid = newTransaction(tsv)
txid = newTransaction(tsv, nil)
qre = newTestQueryExecutor(ctx, tsv, query[0:strings.Index(query, " /*")], txid)
defer testCommitHelper(t, tsv, qre)
checkPlanID(t, planbuilder.PlanInsertPK, qre.plan.PlanID)
Expand All @@ -614,7 +614,7 @@ func TestQueryExecutorPlanUpsertPkSingleUnique(t *testing.T) {
want = &sqltypes.Result{}
ctx = context.Background()
tsv = newTestTabletServer(ctx, noFlags, db)
txid = newTransaction(tsv)
txid = newTransaction(tsv, nil)
qre = newTestQueryExecutor(ctx, tsv, query[0:strings.Index(query, " /*")], txid)
defer testCommitHelper(t, tsv, qre)
checkPlanID(t, planbuilder.PlanInsertPK, qre.plan.PlanID)
Expand All @@ -641,7 +641,7 @@ func TestQueryExecutorPlanUpsertPkRBR(t *testing.T) {
want := &sqltypes.Result{}
ctx := context.Background()
tsv := newTestTabletServer(ctx, noFlags, db)
txid := newTransaction(tsv)
txid := newTransaction(tsv, nil)
qre := newTestQueryExecutor(ctx, tsv, query, txid)
tsv.qe.binlogFormat = connpool.BinlogFormatRow
defer tsv.StopService()
Expand Down Expand Up @@ -726,7 +726,37 @@ func TestQueryExecutorPlanDmlPk(t *testing.T) {
db.AddQuery(query, want)
ctx := context.Background()
tsv := newTestTabletServer(ctx, noFlags, db)
txid := newTransaction(tsv)
txid := newTransaction(tsv, nil)
qre := newTestQueryExecutor(ctx, tsv, query, txid)
defer tsv.StopService()
defer testCommitHelper(t, tsv, qre)
checkPlanID(t, planbuilder.PlanDMLPK, qre.plan.PlanID)
got, err := qre.Execute()
if err != nil {
t.Fatalf("qre.Execute() = %v, want nil", err)
}
if !reflect.DeepEqual(got, want) {
t.Fatalf("got: %v, want: %v", got, want)
}
wantqueries := []string{"update test_table set name = 2 where pk in (1) /* _stream test_table (pk ) (1 ); */"}
gotqueries := fetchRecordedQueries(qre)
if !reflect.DeepEqual(gotqueries, wantqueries) {
t.Errorf("queries: %v, want %v", gotqueries, wantqueries)
}
}

func TestQueryExecutorPlanDmlPkTransactionIsolation(t *testing.T) {
db := setUpQueryExecutorTest(t)
defer db.Close()
query := "update test_table set name = 2 where pk in (1) /* _stream test_table (pk ) (1 ); */"
want := &sqltypes.Result{}
db.AddQuery(query, want)
ctx := context.Background()
tsv := newTestTabletServer(ctx, noFlags, db)
db.AddQuery("set transaction isolation level SERIALIZABLE", &sqltypes.Result{})
txid := newTransaction(tsv, &querypb.ExecuteOptions{
TransactionIsolation: querypb.ExecuteOptions_SERIALIZABLE,
})
qre := newTestQueryExecutor(ctx, tsv, query, txid)
defer tsv.StopService()
defer testCommitHelper(t, tsv, qre)
Expand Down Expand Up @@ -755,7 +785,7 @@ func TestQueryExecutorPlanDmlPkRBR(t *testing.T) {
db.AddQuery(query, want)
ctx := context.Background()
tsv := newTestTabletServer(ctx, noFlags, db)
txid := newTransaction(tsv)
txid := newTransaction(tsv, nil)
qre := newTestQueryExecutor(ctx, tsv, query, txid)
tsv.qe.binlogFormat = connpool.BinlogFormatRow
defer tsv.StopService()
Expand Down Expand Up @@ -794,7 +824,7 @@ func TestQueryExecutorPlanDmlMessage(t *testing.T) {
db.AddQuery("update msg set time_acked = 2, time_next = null where (time_scheduled = 12 and id = 1) /* _stream msg (time_scheduled id ) (12 1 ); */", want)
ctx := context.Background()
tsv := newTestTabletServer(ctx, noFlags, db)
txid := newTransaction(tsv)
txid := newTransaction(tsv, nil)
qre := newTestQueryExecutor(ctx, tsv, query, txid)
defer tsv.StopService()
defer testCommitHelper(t, tsv, qre)
Expand Down Expand Up @@ -834,6 +864,32 @@ func TestQueryExecutorPlanDmlAutoCommit(t *testing.T) {
}
}

func TestQueryExecutorPlanDmlAutoCommitTransactionIsolation(t *testing.T) {
db := setUpQueryExecutorTest(t)
defer db.Close()
query := "update test_table set name = 2 where pk in (1) /* _stream test_table (pk ) (1 ); */"
want := &sqltypes.Result{}
db.AddQuery(query, want)
ctx := context.Background()
tsv := newTestTabletServer(ctx, noFlags, db)
qre := newTestQueryExecutor(ctx, tsv, query, 0)

qre.options = &querypb.ExecuteOptions{
TransactionIsolation: querypb.ExecuteOptions_READ_UNCOMMITTED,
}
db.AddQuery("set transaction isolation level READ UNCOMMITTED", &sqltypes.Result{})

defer tsv.StopService()
checkPlanID(t, planbuilder.PlanDMLPK, qre.plan.PlanID)
got, err := qre.Execute()
if err != nil {
t.Fatalf("qre.Execute() = %v, want nil", err)
}
if !reflect.DeepEqual(got, want) {
t.Fatalf("got: %v, want: %v", got, want)
}
}

func TestQueryExecutorPlanDmlSubQuery(t *testing.T) {
db := setUpQueryExecutorTest(t)
defer db.Close()
Expand All @@ -854,7 +910,7 @@ func TestQueryExecutorPlanDmlSubQuery(t *testing.T) {
db.AddQuery(updateQuery, want)
ctx := context.Background()
tsv := newTestTabletServer(ctx, noFlags, db)
txid := newTransaction(tsv)
txid := newTransaction(tsv, nil)
qre := newTestQueryExecutor(ctx, tsv, query, txid)
defer tsv.StopService()
defer testCommitHelper(t, tsv, qre)
Expand Down Expand Up @@ -895,7 +951,7 @@ func TestQueryExecutorPlanDmlSubQueryRBR(t *testing.T) {
db.AddQuery(updateQuery, want)
ctx := context.Background()
tsv := newTestTabletServer(ctx, noFlags, db)
txid := newTransaction(tsv)
txid := newTransaction(tsv, nil)
qre := newTestQueryExecutor(ctx, tsv, query, txid)
tsv.qe.binlogFormat = connpool.BinlogFormatRow
defer tsv.StopService()
Expand Down Expand Up @@ -947,7 +1003,7 @@ func TestQueryExecutorPlanOtherWithinATransaction(t *testing.T) {
db.AddQuery(query, want)
ctx := context.Background()
tsv := newTestTabletServer(ctx, noFlags, db)
txid := newTransaction(tsv)
txid := newTransaction(tsv, nil)
qre := newTestQueryExecutor(ctx, tsv, query, txid)
defer tsv.StopService()
defer testCommitHelper(t, tsv, qre)
Expand Down Expand Up @@ -984,7 +1040,7 @@ func TestQueryExecutorPlanPassSelectWithInATransaction(t *testing.T) {
})
ctx := context.Background()
tsv := newTestTabletServer(ctx, noFlags, db)
txid := newTransaction(tsv)
txid := newTransaction(tsv, nil)
qre := newTestQueryExecutor(ctx, tsv, query, txid)
defer tsv.StopService()
defer testCommitHelper(t, tsv, qre)
Expand Down Expand Up @@ -1098,7 +1154,7 @@ func TestQueryExecutorPlanSet(t *testing.T) {
}

// Test inside transaction.
txid := newTransaction(tsv)
txid := newTransaction(tsv, nil)
qre = newTestQueryExecutor(ctx, tsv, setQuery, txid)
got, err = qre.Execute()
if err != nil {
Expand Down Expand Up @@ -1793,8 +1849,8 @@ func newTestTabletServer(ctx context.Context, flags executorFlags, db *fakesqldb
return tsv
}

func newTransaction(tsv *TabletServer) int64 {
transactionID, err := tsv.Begin(context.Background(), &tsv.target, nil)
func newTransaction(tsv *TabletServer, options *querypb.ExecuteOptions) int64 {
transactionID, err := tsv.Begin(context.Background(), &tsv.target, options)
if err != nil {
panic(fmt.Errorf("failed to start a transaction: %v", err))
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ func (tsv *TabletServer) Begin(ctx context.Context, target *querypb.Target, opti
// TODO(erez): I think this should be RESOURCE_EXHAUSTED.
return vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "Transaction throttled")
}
transactionID, err = tsv.te.txPool.Begin(ctx, options.GetClientFoundRows())
transactionID, err = tsv.te.txPool.Begin(ctx, options.GetClientFoundRows(), options.GetTransactionIsolation())
logStats.TransactionID = transactionID
return err
},
Expand Down
3 changes: 2 additions & 1 deletion go/vt/vttablet/tabletserver/tx_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/youtube/vitess/go/vt/concurrency"
"github.com/youtube/vitess/go/vt/dbconfigs"
"github.com/youtube/vitess/go/vt/dtids"
"github.com/youtube/vitess/go/vt/proto/query"
"github.com/youtube/vitess/go/vt/vtgate/vtgateconn"
"github.com/youtube/vitess/go/vt/vttablet/tabletserver/connpool"
"github.com/youtube/vitess/go/vt/vttablet/tabletserver/tabletenv"
Expand Down Expand Up @@ -203,7 +204,7 @@ outer:
if txid > maxid {
maxid = txid
}
conn, err := te.txPool.LocalBegin(ctx, false)
conn, err := te.txPool.LocalBegin(ctx, false, query.ExecuteOptions_DEFAULT)
if err != nil {
allErr.RecordError(err)
continue
Expand Down
11 changes: 6 additions & 5 deletions go/vt/vttablet/tabletserver/tx_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/youtube/vitess/go/vt/vttablet/tabletserver/tabletenv"

querypb "github.com/youtube/vitess/go/vt/proto/query"
"golang.org/x/net/context"
)

Expand All @@ -47,7 +48,7 @@ func TestTxEngineClose(t *testing.T) {

// Normal close with timeout wait.
te.Open(dbconfigs)
c, err := te.txPool.LocalBegin(ctx, false)
c, err := te.txPool.LocalBegin(ctx, false, querypb.ExecuteOptions_DEFAULT)
if err != nil {
t.Fatal(err)
}
Expand All @@ -60,7 +61,7 @@ func TestTxEngineClose(t *testing.T) {

// Immediate close.
te.Open(dbconfigs)
c, err = te.txPool.LocalBegin(ctx, false)
c, err = te.txPool.LocalBegin(ctx, false, querypb.ExecuteOptions_DEFAULT)
if err != nil {
t.Fatal(err)
}
Expand All @@ -74,7 +75,7 @@ func TestTxEngineClose(t *testing.T) {
// Normal close with short grace period.
te.shutdownGracePeriod = 250 * time.Millisecond
te.Open(dbconfigs)
c, err = te.txPool.LocalBegin(ctx, false)
c, err = te.txPool.LocalBegin(ctx, false, querypb.ExecuteOptions_DEFAULT)
if err != nil {
t.Fatal(err)
}
Expand All @@ -91,7 +92,7 @@ func TestTxEngineClose(t *testing.T) {
// Normal close with short grace period, but pool gets empty early.
te.shutdownGracePeriod = 250 * time.Millisecond
te.Open(dbconfigs)
c, err = te.txPool.LocalBegin(ctx, false)
c, err = te.txPool.LocalBegin(ctx, false, querypb.ExecuteOptions_DEFAULT)
if err != nil {
t.Fatal(err)
}
Expand All @@ -115,7 +116,7 @@ func TestTxEngineClose(t *testing.T) {

// Immediate close, but connection is in use.
te.Open(dbconfigs)
c, err = te.txPool.LocalBegin(ctx, false)
c, err = te.txPool.LocalBegin(ctx, false, querypb.ExecuteOptions_DEFAULT)
if err != nil {
t.Fatal(err)
}
Expand Down
Loading