diff --git a/data/test/tabletserver/exec_cases.txt b/data/test/tabletserver/exec_cases.txt index aa644d63609..133d8df567d 100644 --- a/data/test/tabletserver/exec_cases.txt +++ b/data/test/tabletserver/exec_cases.txt @@ -25,7 +25,7 @@ "FullQuery": "select distinct * from a limit :#maxLimit" } -# grouy by +# group by "select * from a group by b" { "PlanID": "PASS_SELECT", @@ -186,6 +186,15 @@ "FullQuery": "insert into b.a(eid, id) values (1, :a)" } +# insert cross-db +options:PassthroughDMLs +"insert into b.a (eid, id) values (1, :a)" +{ + "PlanID": "PASS_DML", + "TableName": "", + "FullQuery": "insert into b.a(eid, id) values (1, :a)" +} + # insert with bind value "insert into a (eid, id) values (1, :a)" { @@ -196,6 +205,15 @@ "PKValues": [[1], [":a"]] } +# insert with bind value +options:PassthroughDMLs +"insert into a (eid, id) values (1, :a)" +{ + "PlanID": "PASS_DML", + "TableName": "", + "FullQuery": "insert into a(eid, id) values (1, :a)" +} + # default number "insert into a (id) values (1)" { @@ -307,6 +325,15 @@ "PKValues": [[1], [2]] } +# upsert multiple unique index +options:PassthroughDMLs +"insert into a (eid, id) values (1, 2) on duplicate key update name = func(a)" +{ + "PlanID": "PASS_DML", + "TableName": "", + "FullQuery": "insert into a(eid, id) values (1, 2) on duplicate key update name = func(a)" +} + # upsert single unique index "insert into b (eid, id) values (1, 2) on duplicate key update name = func(a)" { @@ -610,6 +637,15 @@ "FullQuery": "replace into b(eid, id) values (1, 2), (3, 4)" } +# multi-row +options:PassthroughDMLs +"replace into b (eid, id) values (1, 2), (3, 4)" +{ + "PlanID": "PASS_DML", + "TableName": "", + "FullQuery": "replace into b(eid, id) values (1, 2), (3, 4)" +} + # single-row with set "replace into b set eid = 1, id = 2" { @@ -648,6 +684,15 @@ "WhereClause": " where name in ('a', 'b')" } +# update limit with pk +options:PassthroughDMLs +"update d set foo='foo' where name in ('a', 'b') limit 1" +{ + "PlanID": "PASS_DML", + "TableName": "", + "FullQuery": "update d set foo = 'foo' where name in ('a', 'b') limit 1" +} + # update cross-db "update b.a set name='foo' where eid=1 and id=1" { @@ -657,6 +702,15 @@ "FullQuery": "update b.a set name = 'foo' where eid = 1 and id = 1" } +# update cross-db +options:PassthroughDMLs +"update b.a set name='foo' where eid=1 and id=1" +{ + "PlanID": "PASS_DML", + "TableName": "", + "FullQuery": "update b.a set name = 'foo' where eid = 1 and id = 1" +} + # multi-table update "update a, b set a.name = 'foo' where a.id = b.id and b.var = 'test'" { @@ -865,6 +919,15 @@ "WhereClause": " where name in ('a', 'b')" } +# delete limit with pk +options:PassthroughDMLs +"delete from d where name in ('a', 'b') limit 1" +{ + "PlanID": "PASS_DML", + "TableName": "", + "FullQuery": "delete from d where name in ('a', 'b') limit 1" +} + # delete cross-db "delete from b.a where eid=1 and id=1" { @@ -907,6 +970,15 @@ "WhereClause": " where eid = 1 and id = 1" } +# pk +options:PassthroughDMLs +"delete from a where eid=1 and id=1" +{ + "PlanID": "PASS_DML", + "TableName": "", + "FullQuery": "delete from a where eid = 1 and id = 1" +} + # partial pk "delete from a where eid=1" { diff --git a/go/vt/vttablet/tabletserver/planbuilder/dml.go b/go/vt/vttablet/tabletserver/planbuilder/dml.go index f8863f4bc56..39f4b874411 100644 --- a/go/vt/vttablet/tabletserver/planbuilder/dml.go +++ b/go/vt/vttablet/tabletserver/planbuilder/dml.go @@ -32,6 +32,10 @@ func analyzeUpdate(upd *sqlparser.Update, tables map[string]*schema.Table) (plan FullQuery: GenerateFullQuery(upd), } + if PassthroughDMLs { + return plan, nil + } + if len(upd.TableExprs) > 1 { plan.Reason = ReasonMultiTable return plan, nil @@ -95,6 +99,10 @@ func analyzeDelete(del *sqlparser.Delete, tables map[string]*schema.Table) (plan FullQuery: GenerateFullQuery(del), } + if PassthroughDMLs { + return plan, nil + } + if len(del.TableExprs) > 1 { plan.Reason = ReasonMultiTable return plan, nil @@ -292,6 +300,10 @@ func analyzeInsert(ins *sqlparser.Insert, tables map[string]*schema.Table) (plan PlanID: PlanPassDML, FullQuery: GenerateFullQuery(ins), } + if PassthroughDMLs { + return plan, nil + } + if ins.Action == sqlparser.ReplaceStr { plan.Reason = ReasonReplace return plan, nil diff --git a/go/vt/vttablet/tabletserver/planbuilder/plan.go b/go/vt/vttablet/tabletserver/planbuilder/plan.go index d76326d92d3..8f20563c53f 100644 --- a/go/vt/vttablet/tabletserver/planbuilder/plan.go +++ b/go/vt/vttablet/tabletserver/planbuilder/plan.go @@ -32,6 +32,9 @@ var ( // ErrTooComplex indicates given sql query is too complex. ErrTooComplex = vterrors.New(vtrpcpb.Code_INVALID_ARGUMENT, "Complex") execLimit = &sqlparser.Limit{Rowcount: sqlparser.NewValArg([]byte(":#maxLimit"))} + + // PassthroughDMLs will return PlanPassDML for all update or delete statements + PassthroughDMLs = false ) //_______________________________________________ @@ -49,6 +52,9 @@ const ( PlanNextval // PlanPassDML is pass through update & delete statements. This is // the default plan for update and delete statements. + // If PassthroughDMLs is true, then it is used for all DML statements + // and is valid in all replication modes. + // Otherwise is only allowed in row based replication mode PlanPassDML // PlanDMLPK is an update or delete with an equality where clause(s) // on primary key(s). diff --git a/go/vt/vttablet/tabletserver/planbuilder/plan_test.go b/go/vt/vttablet/tabletserver/planbuilder/plan_test.go index 211cd3fed1f..78577f3137d 100644 --- a/go/vt/vttablet/tabletserver/planbuilder/plan_test.go +++ b/go/vt/vttablet/tabletserver/planbuilder/plan_test.go @@ -76,7 +76,12 @@ func toJSON(p *Plan) ([]byte, error) { func TestPlan(t *testing.T) { testSchema := loadSchema("schema_test.json") for tcase := range iterateExecFile("exec_cases.txt") { + if strings.Contains(tcase.options, "PassthroughDMLs") { + PassthroughDMLs = true + } plan, err := Build(tcase.input, testSchema) + PassthroughDMLs = false + var out string if err != nil { out = err.Error() @@ -232,10 +237,11 @@ func loadSchema(name string) map[string]*schema.Table { } type testCase struct { - file string - lineno int - input string - output string + file string + lineno int + options string + input string + output string } func iterateExecFile(name string) (testCaseIterator chan testCase) { @@ -250,6 +256,7 @@ func iterateExecFile(name string) (testCaseIterator chan testCase) { r := bufio.NewReader(fd) lineno := 0 + options := "" for { binput, err := r.ReadBytes('\n') if err != nil { @@ -265,6 +272,11 @@ func iterateExecFile(name string) (testCaseIterator chan testCase) { //fmt.Printf("%s\n", input) continue } + + if strings.HasPrefix(input, "options:") { + options = input[8:] + continue + } err = json.Unmarshal(binput, &input) if err != nil { fmt.Printf("Line: %d, input: %s\n", lineno, binput) @@ -293,7 +305,8 @@ func iterateExecFile(name string) (testCaseIterator chan testCase) { break } } - testCaseIterator <- testCase{name, lineno, input, string(output)} + testCaseIterator <- testCase{name, lineno, options, input, string(output)} + options = "" } }() return testCaseIterator diff --git a/go/vt/vttablet/tabletserver/query_engine.go b/go/vt/vttablet/tabletserver/query_engine.go index d304ef50733..e5c1461e8ac 100644 --- a/go/vt/vttablet/tabletserver/query_engine.go +++ b/go/vt/vttablet/tabletserver/query_engine.go @@ -135,6 +135,8 @@ type QueryEngine struct { maxResultSize sync2.AtomicInt64 warnResultSize sync2.AtomicInt64 maxDMLRows sync2.AtomicInt64 + passthroughDMLs sync2.AtomicBool + allowUnsafeDMLs bool streamBufferSize sync2.AtomicInt64 // tableaclExemptCount count the number of accesses allowed // based on membership in the superuser ACL @@ -209,6 +211,9 @@ func NewQueryEngine(checker connpool.MySQLChecker, se *schema.Engine, config tab qe.maxDMLRows = sync2.NewAtomicInt64(int64(config.MaxDMLRows)) qe.streamBufferSize = sync2.NewAtomicInt64(int64(config.StreamBufferSize)) + qe.passthroughDMLs = sync2.NewAtomicBool(config.PassthroughDMLs) + planbuilder.PassthroughDMLs = config.PassthroughDMLs + qe.accessCheckerLogger = logutil.NewThrottledLogger("accessChecker", 1*time.Second) qeOnce.Do(func() { diff --git a/go/vt/vttablet/tabletserver/query_executor.go b/go/vt/vttablet/tabletserver/query_executor.go index d4270158fc7..faf8a2fb2d2 100644 --- a/go/vt/vttablet/tabletserver/query_executor.go +++ b/go/vt/vttablet/tabletserver/query_executor.go @@ -121,7 +121,7 @@ func (qre *QueryExecutor) Execute() (reply *sqltypes.Result, err error) { defer conn.Recycle() switch qre.plan.PlanID { case planbuilder.PlanPassDML: - if qre.tsv.qe.binlogFormat != connpool.BinlogFormatRow { + if !qre.tsv.qe.allowUnsafeDMLs && (qre.tsv.qe.binlogFormat != connpool.BinlogFormatRow) { return nil, vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "unsupported: cannot identify primary key of statement") } return qre.txFetch(conn, qre.plan.FullQuery, qre.bindVars, nil, nil, false, true) @@ -259,7 +259,7 @@ func (qre *QueryExecutor) execDmlAutoCommit() (reply *sqltypes.Result, err error return qre.execAsTransaction(func(conn *TxConnection) (reply *sqltypes.Result, err error) { switch qre.plan.PlanID { case planbuilder.PlanPassDML: - if qre.tsv.qe.binlogFormat != connpool.BinlogFormatRow { + if !qre.tsv.qe.allowUnsafeDMLs && (qre.tsv.qe.binlogFormat != connpool.BinlogFormatRow) { return nil, vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "unsupported: cannot identify primary key of statement") } reply, err = qre.txFetch(conn, qre.plan.FullQuery, qre.bindVars, nil, nil, false, true) diff --git a/go/vt/vttablet/tabletserver/query_executor_test.go b/go/vt/vttablet/tabletserver/query_executor_test.go index fe39ec1772e..031a54e3575 100644 --- a/go/vt/vttablet/tabletserver/query_executor_test.go +++ b/go/vt/vttablet/tabletserver/query_executor_test.go @@ -105,6 +105,62 @@ func TestQueryExecutorPlanPassDmlRBR(t *testing.T) { testCommitHelper(t, tsv, qre) } +func TestQueryExecutorPassthroughDml(t *testing.T) { + db := setUpQueryExecutorTest(t) + defer db.Close() + planbuilder.PassthroughDMLs = true + defer func() { planbuilder.PassthroughDMLs = false }() + query := "update test_table set pk = foo()" + want := &sqltypes.Result{} + db.AddQuery(query, want) + ctx := context.Background() + // RBR mode + tsv := newTestTabletServer(ctx, noFlags, db) + defer tsv.StopService() + + planbuilder.PassthroughDMLs = true + defer func() { planbuilder.PassthroughDMLs = false }() + tsv.qe.passthroughDMLs.Set(true) + tsv.qe.binlogFormat = connpool.BinlogFormatRow + + txid := newTransaction(tsv, nil) + qre := newTestQueryExecutor(ctx, tsv, query, txid) + + checkPlanID(t, planbuilder.PlanPassDML, qre.plan.PlanID) + got, err := qre.Execute() + if err != nil { + t.Fatalf("qre.Execute() = %v, want nil", err) + } + if !reflect.DeepEqual(got, want) { + t.Fatalf("got: %v, want: %v", got, want) + } + wantqueries := []string{query} + gotqueries := fetchRecordedQueries(qre) + if !reflect.DeepEqual(gotqueries, wantqueries) { + t.Errorf("queries: %v, want %v", gotqueries, wantqueries) + } + + // Statement mode also works when allowUnsafeDMLs is true + tsv.qe.binlogFormat = connpool.BinlogFormatStatement + _, err = qre.Execute() + if code := vterrors.Code(err); code != vtrpcpb.Code_UNIMPLEMENTED { + t.Errorf("qre.Execute: %v, want %v", code, vtrpcpb.Code_INVALID_ARGUMENT) + } + + tsv.qe.allowUnsafeDMLs = true + got, err = qre.Execute() + if !reflect.DeepEqual(got, want) { + t.Fatalf("got: %v, want: %v", got, want) + } + wantqueries = []string{query, query} + gotqueries = fetchRecordedQueries(qre) + if !reflect.DeepEqual(gotqueries, wantqueries) { + t.Errorf("queries: %v, want %v", gotqueries, wantqueries) + } + + testCommitHelper(t, tsv, qre) +} + func TestQueryExecutorPlanPassDmlAutoCommitRBR(t *testing.T) { db := setUpQueryExecutorTest(t) defer db.Close() @@ -134,6 +190,49 @@ func TestQueryExecutorPlanPassDmlAutoCommitRBR(t *testing.T) { } } +func TestQueryExecutorPassthroughDmlAutoCommit(t *testing.T) { + db := setUpQueryExecutorTest(t) + defer db.Close() + query := "update test_table set pk = foo()" + want := &sqltypes.Result{} + db.AddQuery(query, want) + ctx := context.Background() + // RBR mode + tsv := newTestTabletServer(ctx, noFlags, db) + defer tsv.StopService() + + planbuilder.PassthroughDMLs = true + defer func() { planbuilder.PassthroughDMLs = false }() + tsv.qe.passthroughDMLs.Set(true) + tsv.qe.binlogFormat = connpool.BinlogFormatRow + + qre := newTestQueryExecutor(ctx, tsv, query, 0) + checkPlanID(t, planbuilder.PlanPassDML, qre.plan.PlanID) + got, err := qre.Execute() + if err != nil { + t.Fatalf("qre.Execute() = %v, want nil", err) + } + if !reflect.DeepEqual(got, want) { + t.Fatalf("got: %v, want: %v", got, want) + } + + // Statement mode + tsv.qe.binlogFormat = connpool.BinlogFormatStatement + _, err = qre.Execute() + if code := vterrors.Code(err); code != vtrpcpb.Code_UNIMPLEMENTED { + t.Errorf("qre.Execute: %v, want %v", code, vtrpcpb.Code_INVALID_ARGUMENT) + } + + tsv.qe.allowUnsafeDMLs = true + got, err = qre.Execute() + if err != nil { + t.Fatalf("qre.Execute() = %v, want nil", err) + } + if !reflect.DeepEqual(got, want) { + t.Fatalf("got: %v, want: %v", got, want) + } +} + func TestQueryExecutorPlanPassDmlReplaceInto(t *testing.T) { db := setUpQueryExecutorTest(t) defer db.Close() diff --git a/go/vt/vttablet/tabletserver/tabletenv/config.go b/go/vt/vttablet/tabletserver/tabletenv/config.go index 2baf3f81637..66370d87651 100644 --- a/go/vt/vttablet/tabletserver/tabletenv/config.go +++ b/go/vt/vttablet/tabletserver/tabletenv/config.go @@ -55,7 +55,9 @@ func init() { flag.Float64Var(&Config.TxShutDownGracePeriod, "transaction_shutdown_grace_period", DefaultQsConfig.TxShutDownGracePeriod, "how long to wait (in seconds) for transactions to complete during graceful shutdown.") flag.IntVar(&Config.MaxResultSize, "queryserver-config-max-result-size", DefaultQsConfig.MaxResultSize, "query server max result size, maximum number of rows allowed to return from vttablet for non-streaming queries.") flag.IntVar(&Config.WarnResultSize, "queryserver-config-warn-result-size", DefaultQsConfig.WarnResultSize, "query server result size warning threshold, warn if number of rows returned from vttablet for non-streaming queries exceeds this") - flag.IntVar(&Config.MaxDMLRows, "queryserver-config-max-dml-rows", DefaultQsConfig.MaxDMLRows, "query server max dml rows per statement, maximum number of rows allowed to return at a time for an upadte or delete with either 1) an equality where clauses on primary keys, or 2) a subselect statement. For update and delete statements in above two categories, vttablet will split the original query into multiple small queries based on this configuration value. ") + flag.IntVar(&Config.MaxDMLRows, "queryserver-config-max-dml-rows", DefaultQsConfig.MaxDMLRows, "query server max dml rows per statement, maximum number of rows allowed to return at a time for an update or delete with either 1) an equality where clauses on primary keys, or 2) a subselect statement. For update and delete statements in above two categories, vttablet will split the original query into multiple small queries based on this configuration value. ") + flag.BoolVar(&Config.PassthroughDMLs, "queryserver-config-passthrough-dmls", DefaultQsConfig.PassthroughDMLs, "query server pass through all dml statements without rewriting") + flag.IntVar(&Config.StreamBufferSize, "queryserver-config-stream-buffer-size", DefaultQsConfig.StreamBufferSize, "query server stream buffer size, the maximum number of bytes sent from vttablet for each stream call. It's recommended to keep this value in sync with vtgate's stream_buffer_size.") flag.IntVar(&Config.QueryPlanCacheSize, "queryserver-config-query-cache-size", DefaultQsConfig.QueryPlanCacheSize, "query server query cache size, maximum number of queries to be cached. vttablet analyzes every incoming query and generate a query plan, these plans are being cached in a lru cache. This config controls the capacity of the lru cache.") flag.Float64Var(&Config.SchemaReloadTime, "queryserver-config-schema-reload-time", DefaultQsConfig.SchemaReloadTime, "query server schema reload time, how often vttablet reloads schemas from underlying MySQL instance in seconds. vttablet keeps table schemas in its own memory and periodically refreshes it from MySQL. This config controls the reload time.") @@ -128,6 +130,7 @@ type TabletConfig struct { MaxResultSize int WarnResultSize int MaxDMLRows int + PassthroughDMLs bool StreamBufferSize int QueryPlanCacheSize int SchemaReloadTime float64 @@ -194,6 +197,7 @@ var DefaultQsConfig = TabletConfig{ MaxResultSize: 10000, WarnResultSize: 0, MaxDMLRows: 500, + PassthroughDMLs: false, QueryPlanCacheSize: 5000, SchemaReloadTime: 30 * 60, QueryTimeout: 30,