Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 73 additions & 1 deletion data/test/tabletserver/exec_cases.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
"FullQuery": "select distinct * from a limit :#maxLimit"
}

# grouy by
# group by
"select * from a group by b"
{
"PlanID": "PASS_SELECT",
Expand Down Expand Up @@ -186,6 +186,15 @@
"FullQuery": "insert into b.a(eid, id) values (1, :a)"
}

# insert cross-db
options:PassthroughDMLs
"insert into b.a (eid, id) values (1, :a)"
{
"PlanID": "PASS_DML",
"TableName": "",
"FullQuery": "insert into b.a(eid, id) values (1, :a)"
}

# insert with bind value
"insert into a (eid, id) values (1, :a)"
{
Expand All @@ -196,6 +205,15 @@
"PKValues": [[1], [":a"]]
}

# insert with bind value
options:PassthroughDMLs
"insert into a (eid, id) values (1, :a)"
{
"PlanID": "PASS_DML",
"TableName": "",
"FullQuery": "insert into a(eid, id) values (1, :a)"
}

# default number
"insert into a (id) values (1)"
{
Expand Down Expand Up @@ -307,6 +325,15 @@
"PKValues": [[1], [2]]
}

# upsert multiple unique index
options:PassthroughDMLs
"insert into a (eid, id) values (1, 2) on duplicate key update name = func(a)"
{
"PlanID": "PASS_DML",
"TableName": "",
"FullQuery": "insert into a(eid, id) values (1, 2) on duplicate key update name = func(a)"
}

# upsert single unique index
"insert into b (eid, id) values (1, 2) on duplicate key update name = func(a)"
{
Expand Down Expand Up @@ -610,6 +637,15 @@
"FullQuery": "replace into b(eid, id) values (1, 2), (3, 4)"
}

# multi-row
options:PassthroughDMLs
"replace into b (eid, id) values (1, 2), (3, 4)"
{
"PlanID": "PASS_DML",
"TableName": "",
"FullQuery": "replace into b(eid, id) values (1, 2), (3, 4)"
}

# single-row with set
"replace into b set eid = 1, id = 2"
{
Expand Down Expand Up @@ -648,6 +684,15 @@
"WhereClause": " where name in ('a', 'b')"
}

# update limit with pk
options:PassthroughDMLs
"update d set foo='foo' where name in ('a', 'b') limit 1"
{
"PlanID": "PASS_DML",
"TableName": "",
"FullQuery": "update d set foo = 'foo' where name in ('a', 'b') limit 1"
}

# update cross-db
"update b.a set name='foo' where eid=1 and id=1"
{
Expand All @@ -657,6 +702,15 @@
"FullQuery": "update b.a set name = 'foo' where eid = 1 and id = 1"
}

# update cross-db
options:PassthroughDMLs
"update b.a set name='foo' where eid=1 and id=1"
{
"PlanID": "PASS_DML",
"TableName": "",
"FullQuery": "update b.a set name = 'foo' where eid = 1 and id = 1"
}

# multi-table update
"update a, b set a.name = 'foo' where a.id = b.id and b.var = 'test'"
{
Expand Down Expand Up @@ -865,6 +919,15 @@
"WhereClause": " where name in ('a', 'b')"
}

# delete limit with pk
options:PassthroughDMLs
"delete from d where name in ('a', 'b') limit 1"
{
"PlanID": "PASS_DML",
"TableName": "",
"FullQuery": "delete from d where name in ('a', 'b') limit 1"
}

# delete cross-db
"delete from b.a where eid=1 and id=1"
{
Expand Down Expand Up @@ -907,6 +970,15 @@
"WhereClause": " where eid = 1 and id = 1"
}

# pk
options:PassthroughDMLs
"delete from a where eid=1 and id=1"
{
"PlanID": "PASS_DML",
"TableName": "",
"FullQuery": "delete from a where eid = 1 and id = 1"
}

# partial pk
"delete from a where eid=1"
{
Expand Down
12 changes: 12 additions & 0 deletions go/vt/vttablet/tabletserver/planbuilder/dml.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ func analyzeUpdate(upd *sqlparser.Update, tables map[string]*schema.Table) (plan
FullQuery: GenerateFullQuery(upd),
}

if PassthroughDMLs {
return plan, nil
}

if len(upd.TableExprs) > 1 {
plan.Reason = ReasonMultiTable
return plan, nil
Expand Down Expand Up @@ -95,6 +99,10 @@ func analyzeDelete(del *sqlparser.Delete, tables map[string]*schema.Table) (plan
FullQuery: GenerateFullQuery(del),
}

if PassthroughDMLs {
return plan, nil
}

if len(del.TableExprs) > 1 {
plan.Reason = ReasonMultiTable
return plan, nil
Expand Down Expand Up @@ -292,6 +300,10 @@ func analyzeInsert(ins *sqlparser.Insert, tables map[string]*schema.Table) (plan
PlanID: PlanPassDML,
FullQuery: GenerateFullQuery(ins),
}
if PassthroughDMLs {
return plan, nil
}

if ins.Action == sqlparser.ReplaceStr {
plan.Reason = ReasonReplace
return plan, nil
Expand Down
6 changes: 6 additions & 0 deletions go/vt/vttablet/tabletserver/planbuilder/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ var (
// ErrTooComplex indicates given sql query is too complex.
ErrTooComplex = vterrors.New(vtrpcpb.Code_INVALID_ARGUMENT, "Complex")
execLimit = &sqlparser.Limit{Rowcount: sqlparser.NewValArg([]byte(":#maxLimit"))}

// PassthroughDMLs will return PlanPassDML for all update or delete statements
PassthroughDMLs = false
)

//_______________________________________________
Expand All @@ -49,6 +52,9 @@ const (
PlanNextval
// PlanPassDML is pass through update & delete statements. This is
// the default plan for update and delete statements.
// If PassthroughDMLs is true, then it is used for all DML statements
// and is valid in all replication modes.
// Otherwise is only allowed in row based replication mode
PlanPassDML
// PlanDMLPK is an update or delete with an equality where clause(s)
// on primary key(s).
Expand Down
23 changes: 18 additions & 5 deletions go/vt/vttablet/tabletserver/planbuilder/plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,12 @@ func toJSON(p *Plan) ([]byte, error) {
func TestPlan(t *testing.T) {
testSchema := loadSchema("schema_test.json")
for tcase := range iterateExecFile("exec_cases.txt") {
if strings.Contains(tcase.options, "PassthroughDMLs") {
PassthroughDMLs = true
}
plan, err := Build(tcase.input, testSchema)
PassthroughDMLs = false

var out string
if err != nil {
out = err.Error()
Expand Down Expand Up @@ -232,10 +237,11 @@ func loadSchema(name string) map[string]*schema.Table {
}

type testCase struct {
file string
lineno int
input string
output string
file string
lineno int
options string
input string
output string
}

func iterateExecFile(name string) (testCaseIterator chan testCase) {
Expand All @@ -250,6 +256,7 @@ func iterateExecFile(name string) (testCaseIterator chan testCase) {

r := bufio.NewReader(fd)
lineno := 0
options := ""
for {
binput, err := r.ReadBytes('\n')
if err != nil {
Expand All @@ -265,6 +272,11 @@ func iterateExecFile(name string) (testCaseIterator chan testCase) {
//fmt.Printf("%s\n", input)
continue
}

if strings.HasPrefix(input, "options:") {
options = input[8:]
continue
}
err = json.Unmarshal(binput, &input)
if err != nil {
fmt.Printf("Line: %d, input: %s\n", lineno, binput)
Expand Down Expand Up @@ -293,7 +305,8 @@ func iterateExecFile(name string) (testCaseIterator chan testCase) {
break
}
}
testCaseIterator <- testCase{name, lineno, input, string(output)}
testCaseIterator <- testCase{name, lineno, options, input, string(output)}
options = ""
}
}()
return testCaseIterator
Expand Down
5 changes: 5 additions & 0 deletions go/vt/vttablet/tabletserver/query_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ type QueryEngine struct {
maxResultSize sync2.AtomicInt64
warnResultSize sync2.AtomicInt64
maxDMLRows sync2.AtomicInt64
passthroughDMLs sync2.AtomicBool
allowUnsafeDMLs bool
streamBufferSize sync2.AtomicInt64
// tableaclExemptCount count the number of accesses allowed
// based on membership in the superuser ACL
Expand Down Expand Up @@ -209,6 +211,9 @@ func NewQueryEngine(checker connpool.MySQLChecker, se *schema.Engine, config tab
qe.maxDMLRows = sync2.NewAtomicInt64(int64(config.MaxDMLRows))
qe.streamBufferSize = sync2.NewAtomicInt64(int64(config.StreamBufferSize))

qe.passthroughDMLs = sync2.NewAtomicBool(config.PassthroughDMLs)
planbuilder.PassthroughDMLs = config.PassthroughDMLs

qe.accessCheckerLogger = logutil.NewThrottledLogger("accessChecker", 1*time.Second)

qeOnce.Do(func() {
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletserver/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (qre *QueryExecutor) Execute() (reply *sqltypes.Result, err error) {
defer conn.Recycle()
switch qre.plan.PlanID {
case planbuilder.PlanPassDML:
if qre.tsv.qe.binlogFormat != connpool.BinlogFormatRow {
if !qre.tsv.qe.allowUnsafeDMLs && (qre.tsv.qe.binlogFormat != connpool.BinlogFormatRow) {
return nil, vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "unsupported: cannot identify primary key of statement")
}
return qre.txFetch(conn, qre.plan.FullQuery, qre.bindVars, nil, nil, false, true)
Expand Down Expand Up @@ -259,7 +259,7 @@ func (qre *QueryExecutor) execDmlAutoCommit() (reply *sqltypes.Result, err error
return qre.execAsTransaction(func(conn *TxConnection) (reply *sqltypes.Result, err error) {
switch qre.plan.PlanID {
case planbuilder.PlanPassDML:
if qre.tsv.qe.binlogFormat != connpool.BinlogFormatRow {
if !qre.tsv.qe.allowUnsafeDMLs && (qre.tsv.qe.binlogFormat != connpool.BinlogFormatRow) {
return nil, vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "unsupported: cannot identify primary key of statement")
}
reply, err = qre.txFetch(conn, qre.plan.FullQuery, qre.bindVars, nil, nil, false, true)
Expand Down
99 changes: 99 additions & 0 deletions go/vt/vttablet/tabletserver/query_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,62 @@ func TestQueryExecutorPlanPassDmlRBR(t *testing.T) {
testCommitHelper(t, tsv, qre)
}

func TestQueryExecutorPassthroughDml(t *testing.T) {
db := setUpQueryExecutorTest(t)
defer db.Close()
planbuilder.PassthroughDMLs = true
defer func() { planbuilder.PassthroughDMLs = false }()
query := "update test_table set pk = foo()"
want := &sqltypes.Result{}
db.AddQuery(query, want)
ctx := context.Background()
// RBR mode
tsv := newTestTabletServer(ctx, noFlags, db)
defer tsv.StopService()

planbuilder.PassthroughDMLs = true
defer func() { planbuilder.PassthroughDMLs = false }()
tsv.qe.passthroughDMLs.Set(true)
tsv.qe.binlogFormat = connpool.BinlogFormatRow

txid := newTransaction(tsv, nil)
qre := newTestQueryExecutor(ctx, tsv, query, txid)

checkPlanID(t, planbuilder.PlanPassDML, qre.plan.PlanID)
got, err := qre.Execute()
if err != nil {
t.Fatalf("qre.Execute() = %v, want nil", err)
}
if !reflect.DeepEqual(got, want) {
t.Fatalf("got: %v, want: %v", got, want)
}
wantqueries := []string{query}
gotqueries := fetchRecordedQueries(qre)
if !reflect.DeepEqual(gotqueries, wantqueries) {
t.Errorf("queries: %v, want %v", gotqueries, wantqueries)
}

// Statement mode also works when allowUnsafeDMLs is true
tsv.qe.binlogFormat = connpool.BinlogFormatStatement
_, err = qre.Execute()
if code := vterrors.Code(err); code != vtrpcpb.Code_UNIMPLEMENTED {
t.Errorf("qre.Execute: %v, want %v", code, vtrpcpb.Code_INVALID_ARGUMENT)
}

tsv.qe.allowUnsafeDMLs = true
got, err = qre.Execute()
if !reflect.DeepEqual(got, want) {
t.Fatalf("got: %v, want: %v", got, want)
}
wantqueries = []string{query, query}
gotqueries = fetchRecordedQueries(qre)
if !reflect.DeepEqual(gotqueries, wantqueries) {
t.Errorf("queries: %v, want %v", gotqueries, wantqueries)
}

testCommitHelper(t, tsv, qre)
}

func TestQueryExecutorPlanPassDmlAutoCommitRBR(t *testing.T) {
db := setUpQueryExecutorTest(t)
defer db.Close()
Expand Down Expand Up @@ -134,6 +190,49 @@ func TestQueryExecutorPlanPassDmlAutoCommitRBR(t *testing.T) {
}
}

func TestQueryExecutorPassthroughDmlAutoCommit(t *testing.T) {
db := setUpQueryExecutorTest(t)
defer db.Close()
query := "update test_table set pk = foo()"
want := &sqltypes.Result{}
db.AddQuery(query, want)
ctx := context.Background()
// RBR mode
tsv := newTestTabletServer(ctx, noFlags, db)
defer tsv.StopService()

planbuilder.PassthroughDMLs = true
defer func() { planbuilder.PassthroughDMLs = false }()
tsv.qe.passthroughDMLs.Set(true)
tsv.qe.binlogFormat = connpool.BinlogFormatRow

qre := newTestQueryExecutor(ctx, tsv, query, 0)
checkPlanID(t, planbuilder.PlanPassDML, qre.plan.PlanID)
got, err := qre.Execute()
if err != nil {
t.Fatalf("qre.Execute() = %v, want nil", err)
}
if !reflect.DeepEqual(got, want) {
t.Fatalf("got: %v, want: %v", got, want)
}

// Statement mode
tsv.qe.binlogFormat = connpool.BinlogFormatStatement
_, err = qre.Execute()
if code := vterrors.Code(err); code != vtrpcpb.Code_UNIMPLEMENTED {
t.Errorf("qre.Execute: %v, want %v", code, vtrpcpb.Code_INVALID_ARGUMENT)
}

tsv.qe.allowUnsafeDMLs = true
got, err = qre.Execute()
if err != nil {
t.Fatalf("qre.Execute() = %v, want nil", err)
}
if !reflect.DeepEqual(got, want) {
t.Fatalf("got: %v, want: %v", got, want)
}
}

func TestQueryExecutorPlanPassDmlReplaceInto(t *testing.T) {
db := setUpQueryExecutorTest(t)
defer db.Close()
Expand Down
Loading