Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion go/vt/vttablet/tabletserver/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,10 @@ func (qre *QueryExecutor) Execute() (reply *sqltypes.Result, err error) {
case planbuilder.PlanPassSelect:
return qre.execSelect()
case planbuilder.PlanSelectLock:
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "disallowed outside transaction")
if false || !qre.tsv.qe.autoCommit.Get() {
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "disallowed outside transaction")
}
return qre.execDmlAutoCommit()
case planbuilder.PlanSet:
return qre.execSet()
case planbuilder.PlanOtherRead:
Expand Down Expand Up @@ -244,6 +247,8 @@ func (qre *QueryExecutor) execDmlAutoCommit() (reply *sqltypes.Result, err error
reply, err = qre.execDMLSubquery(conn)
case planbuilder.PlanUpsertPK:
reply, err = qre.execUpsertPK(conn)
case planbuilder.PlanSelectLock:
return qre.execDirect(conn)
default:
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unsupported query: %s", qre.query)
}
Expand Down
74 changes: 71 additions & 3 deletions go/vt/vttablet/tabletserver/query_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package tabletserver

import (
"encoding/json"
"fmt"
"io"
"math/rand"
Expand Down Expand Up @@ -846,7 +847,7 @@ func TestQueryExecutorPlanDmlMessage(t *testing.T) {
conn.Recycle()
}

func TestQueryExecutorPlanDmlAutoCommit(t *testing.T) {
func TestQueryExecutorPlanUpdateAutoCommit(t *testing.T) {
db := setUpQueryExecutorTest(t)
defer db.Close()
query := "update test_table set name = 2 where pk in (1) /* _stream test_table (pk ) (1 ); */"
Expand All @@ -866,6 +867,26 @@ func TestQueryExecutorPlanDmlAutoCommit(t *testing.T) {
}
}

func TestQueryExecutorPlanDeleteAutoCommit(t *testing.T) {
db := setUpQueryExecutorTest(t)
defer db.Close()
query := "delete from test_table where pk in (1) /* _stream test_table (pk ) (1 ); */"
want := &sqltypes.Result{}
db.AddQuery(query, want)
ctx := context.Background()
tsv := newTestTabletServer(ctx, noFlags, db)
qre := newTestQueryExecutor(ctx, tsv, query, 0)
defer tsv.StopService()
checkPlanID(t, planbuilder.PlanDMLPK, qre.plan.PlanID)
got, err := qre.Execute()
if err != nil {
t.Fatalf("qre.Execute() = %v, want nil", err)
}
if !reflect.DeepEqual(got, want) {
t.Fatalf("got: %v, want: %v", got, want)
}
}

func TestQueryExecutorPlanDmlAutoCommitTransactionIsolation(t *testing.T) {
db := setUpQueryExecutorTest(t)
defer db.Close()
Expand Down Expand Up @@ -1072,7 +1093,7 @@ func TestQueryExecutorPlanPassSelectWithLockOutsideATransaction(t *testing.T) {
Fields: getTestTableFields(),
})
ctx := context.Background()
tsv := newTestTabletServer(ctx, noFlags, db)
tsv := newTestTabletServer(ctx, noAutoCommit, db)
qre := newTestQueryExecutor(ctx, tsv, query, 0)
defer tsv.StopService()
checkPlanID(t, planbuilder.PlanSelectLock, qre.plan.PlanID)
Expand All @@ -1082,6 +1103,42 @@ func TestQueryExecutorPlanPassSelectWithLockOutsideATransaction(t *testing.T) {
}
}

// When handling DMLs with an owned secondary vindex, vtgate will sometimes send
// a SELECT ... FOR UPDATE query to find out the current value of the secondary
// index field, so this needs to be supported when autocommit is enabled.
func TestQueryExecutorPlanSelectForUpdateAutoCommit(t *testing.T) {
db := setUpQueryExecutorTest(t)
defer db.Close()
query := "select name from test_table where pk in (1) limit 10001 for update"
want := &sqltypes.Result{
Fields: []*querypb.Field{
{
Name: "name",
Type: sqltypes.VarChar,
},
},
RowsAffected: 1,
Rows: [][]sqltypes.Value{
{sqltypes.NewVarChar("name")},
},
}
db.AddQuery(query, want)
ctx := context.Background()
tsv := newTestTabletServer(ctx, noFlags, db)
qre := newTestQueryExecutor(ctx, tsv, query, 0)
defer tsv.StopService()
checkPlanID(t, planbuilder.PlanSelectLock, qre.plan.PlanID)
got, err := qre.Execute()
if err != nil {
t.Fatalf("qre.Execute() = %v, want nil", err)
}
if !reflect.DeepEqual(got, want) {
x, _ := json.MarshalIndent(got, "", " ")
y, _ := json.MarshalIndent(want, "", " ")
t.Fatalf("got: %v, want: %v", string(x), string(y))
}
}

func TestQueryExecutorPlanPassSelect(t *testing.T) {
db := setUpQueryExecutorTest(t)
defer db.Close()
Expand Down Expand Up @@ -1813,6 +1870,7 @@ const (
smallTxPool
noTwopc
shortTwopcAge
noAutoCommit
)

// newTestQueryExecutor uses a package level variable testTabletServer defined in tabletserver_test.go
Expand All @@ -1826,7 +1884,11 @@ func newTestTabletServer(ctx context.Context, flags executorFlags, db *fakesqldb
} else {
config.TransactionCap = 100
}
config.EnableAutoCommit = true
if flags&noAutoCommit > 0 {
config.EnableAutoCommit = false
} else {
config.EnableAutoCommit = true
}
if flags&enableStrictTableACL > 0 {
config.StrictTableACL = true
} else {
Expand Down Expand Up @@ -2001,6 +2063,12 @@ func getQueryExecutorSupportedQueries(testTableHasMultipleUniqueKeys bool) map[s
Type: sqltypes.Int32,
}},
},
"select name from test_table where 1 != 1": {
Fields: []*querypb.Field{{
Name: "name",
Type: sqltypes.VarChar,
}},
},
"describe test_table": {
Fields: mysql.DescribeTableFields,
RowsAffected: 3,
Expand Down