diff --git a/docker/vttestserver/run.sh b/docker/vttestserver/run.sh index 1d04399bfa1..a9a11768ec9 100755 --- a/docker/vttestserver/run.sh +++ b/docker/vttestserver/run.sh @@ -40,7 +40,7 @@ rm -vf "$VTDATAROOT"/"$tablet_dir"/{mysql.sock,mysql.sock.lock} -foreign_key_mode "${FOREIGN_KEY_MODE:-allow}" \ -enable_online_ddl="${ENABLE_ONLINE_DDL:-true}" \ -enable_direct_ddl="${ENABLE_DIRECT_DDL:-true}" \ - -planner_version="${PLANNER_VERSION:-v3}" \ + -planner-version="${PLANNER_VERSION:-v3}" \ -vschema_ddl_authorized_users=% \ -schema_dir="/vt/schema/" diff --git a/go/cmd/vtcombo/main.go b/go/cmd/vtcombo/main.go index 20ac7ad795d..a18c5a0460b 100644 --- a/go/cmd/vtcombo/main.go +++ b/go/cmd/vtcombo/main.go @@ -28,6 +28,9 @@ import ( "strings" "time" + "vitess.io/vitess/go/vt/env" + "vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext" + "vitess.io/vitess/go/vt/vttest" "google.golang.org/protobuf/proto" @@ -53,24 +56,22 @@ import ( ) var ( - tpb vttestpb.VTTestTopology - - schemaDir = flag.String("schema_dir", "", "Schema base directory. Should contain one directory per keyspace, with a vschema.json file if necessary.") - - startMysql = flag.Bool("start_mysql", false, "Should vtcombo also start mysql") - - mysqlPort = flag.Int("mysql_port", 3306, "mysql port") - + schemaDir = flag.String("schema_dir", "", "Schema base directory. Should contain one directory per keyspace, with a vschema.json file if necessary.") + startMysql = flag.Bool("start_mysql", false, "Should vtcombo also start mysql") + mysqlPort = flag.Int("mysql_port", 3306, "mysql port") externalTopoServer = flag.Bool("external_topo_server", false, "Should vtcombo use an external topology server instead of starting its own in-memory topology server. "+ "If true, vtcombo will use the flags defined in topo/server.go to open topo server") + plannerVersion = flag.String("planner-version", "gen4", "Sets the default planner to use when the session has not changed it. Valid values are: V3, Gen4, Gen4Greedy and Gen4Fallback. Gen4Fallback tries the gen4 planner and falls back to the V3 planner if the gen4 fails.") + plannerVersionDeprecated = flag.String("planner_version", "", "Deprecated flag. Use planner-version instead") + tpb vttestpb.VTTestTopology ts *topo.Server resilientServer *srvtopo.ResilientServer ) func init() { flag.Var(vttest.TextTopoData(&tpb), "proto_topo", "vttest proto definition of the topology, encoded in compact text format. See vttest.proto for more information.") - flag.Var(vttest.JsonTopoData(&tpb), "json_topo", "vttest proto definition of the topology, encoded in json format. See vttest.proto for more information.") + flag.Var(vttest.JSONTopoData(&tpb), "json_topo", "vttest proto definition of the topology, encoded in json format. See vttest.proto for more information.") servenv.RegisterDefaultFlags() } @@ -241,12 +242,17 @@ func main() { topodatapb.TabletType_REPLICA, topodatapb.TabletType_RDONLY, } + version, err := env.CheckPlannerVersionFlag(plannerVersion, plannerVersionDeprecated) + if err != nil { + log.Exitf("failed to get planner version from flags: %v", err) + } + plannerVersion, _ := plancontext.PlannerNameToVersion(version) vtgate.QueryLogHandler = "/debug/vtgate/querylog" vtgate.QueryLogzHandler = "/debug/vtgate/querylogz" vtgate.QueryzHandler = "/debug/vtgate/queryz" // pass nil for healthcheck, it will get created - vtg := vtgate.Init(context.Background(), nil, resilientServer, tpb.Cells[0], tabletTypesToWait) + vtg := vtgate.Init(context.Background(), nil, resilientServer, tpb.Cells[0], tabletTypesToWait, plannerVersion) // vtctld configuration and init err = vtctld.InitVtctld(ts) diff --git a/go/cmd/vtexplain/vtexplain.go b/go/cmd/vtexplain/vtexplain.go index d055c9c6df3..36f73180def 100644 --- a/go/cmd/vtexplain/vtexplain.go +++ b/go/cmd/vtexplain/vtexplain.go @@ -21,6 +21,8 @@ import ( "fmt" "os" + "vitess.io/vitess/go/vt/env" + "vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext" "vitess.io/vitess/go/exit" @@ -50,8 +52,8 @@ var ( normalize = flag.Bool("normalize", false, "Whether to enable vtgate normalization") outputMode = flag.String("output-mode", "text", "Output in human-friendly text or json") dbName = flag.String("dbname", "", "Optional database target to override normal routing") - badPlannerVersion = flag.String("planner-version", "", "Deprecated flag. Use planner_version instead") - plannerVersionStr = flag.String("planner_version", "gen4", "Sets the query planner version to use when generating the explain output. Valid values are V3 and Gen4") + plannerVersionStr = flag.String("planner-version", "gen4", "Sets the query planner version to use when generating the explain output. Valid values are V3 and Gen4") + badPlannerVersion = flag.String("planner_version", "", "Deprecated flag. Use planner-version instead") // vtexplainFlags lists all the flags that should show in usage vtexplainFlags = []string{ @@ -147,14 +149,12 @@ func parseAndRun() error { return err } - if badPlannerVersion != nil { - if plannerVersionStr != nil && *badPlannerVersion != *plannerVersionStr { - return fmt.Errorf("can't specify planner-version and planner_version with different versions") - } - log.Warningf("planner-version is deprecated. please use planner_version instead") - plannerVersionStr = badPlannerVersion + verStr, err := env.CheckPlannerVersionFlag(plannerVersionStr, badPlannerVersion) + if err != nil { + return err } - plannerVersion, _ := plancontext.PlannerNameToVersion(*plannerVersionStr) + + plannerVersion, _ := plancontext.PlannerNameToVersion(verStr) if plannerVersion != querypb.ExecuteOptions_V3 && plannerVersion != querypb.ExecuteOptions_Gen4 { return fmt.Errorf("invalid value specified for planner-version of '%s' -- valid values are V3 and Gen4", *plannerVersionStr) } diff --git a/go/cmd/vtgate/vtgate.go b/go/cmd/vtgate/vtgate.go index 3a8b73f9c33..fbacaab76f0 100644 --- a/go/cmd/vtgate/vtgate.go +++ b/go/cmd/vtgate/vtgate.go @@ -23,6 +23,9 @@ import ( "strings" "time" + "vitess.io/vitess/go/vt/env" + "vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext" + "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/vterrors" @@ -39,8 +42,10 @@ import ( ) var ( - cell = flag.String("cell", "test_nj", "cell to use") - tabletTypesToWait = flag.String("tablet_types_to_wait", "", "wait till connected for specified tablet types during Gateway initialization") + cell = flag.String("cell", "test_nj", "cell to use") + tabletTypesToWait = flag.String("tablet_types_to_wait", "", "wait till connected for specified tablet types during Gateway initialization") + plannerVersion = flag.String("planner-version", "gen4", "Sets the default planner to use when the session has not changed it. Valid values are: V3, Gen4, Gen4Greedy and Gen4Fallback. Gen4Fallback tries the gen4 planner and falls back to the V3 planner if the gen4 fails.") + plannerVersionDeprecated = flag.String("planner_version", "", "Deprecated flag. Use planner-version instead") ) var resilientServer *srvtopo.ResilientServer @@ -142,8 +147,14 @@ func main() { log.Exitf("cells_to_watch validation failed: %v", err) } + version, err := env.CheckPlannerVersionFlag(plannerVersion, plannerVersionDeprecated) + if err != nil { + log.Exitf("failed to get planner version from flags: %v", err) + } + plannerVersion, _ := plancontext.PlannerNameToVersion(version) + // pass nil for HealthCheck and it will be created - vtg := vtgate.Init(context.Background(), nil, resilientServer, *cell, tabletTypes) + vtg := vtgate.Init(context.Background(), nil, resilientServer, *cell, tabletTypes, plannerVersion) servenv.OnRun(func() { // Flags are parsed now. Parse the template using the actual flag value and overwrite the current template. diff --git a/go/cmd/vttestserver/main.go b/go/cmd/vttestserver/main.go index 2add4b74564..09b12fff2d0 100644 --- a/go/cmd/vttestserver/main.go +++ b/go/cmd/vttestserver/main.go @@ -138,7 +138,8 @@ func init() { flag.StringVar(&config.Charset, "charset", "utf8mb4", "MySQL charset") - flag.StringVar(&config.PlannerVersion, "planner_version", "gen4", "Sets the default planner to use when the session has not changed it. Valid values are: V3, Gen4, Gen4Greedy and Gen4Fallback. Gen4Fallback tries the new gen4 planner and falls back to the V3 planner if the gen4 fails.") + flag.StringVar(&config.PlannerVersion, "planner-version", "gen4", "Sets the default planner to use when the session has not changed it. Valid values are: V3, Gen4, Gen4Greedy and Gen4Fallback. Gen4Fallback tries the new gen4 planner and falls back to the V3 planner if the gen4 fails.") + flag.StringVar(&config.PlannerVersionDeprecated, "planner_version", "", "planner_version is deprecated. Please use planner-version instead") flag.StringVar(&config.SnapshotFile, "snapshot_file", "", "A MySQL DB snapshot file") diff --git a/go/flags/endtoend/flags_test.go b/go/flags/endtoend/flags_test.go index fd310d9ae1a..693b6a07e2b 100644 --- a/go/flags/endtoend/flags_test.go +++ b/go/flags/endtoend/flags_test.go @@ -279,8 +279,10 @@ var ( URI of opentsdb /api/put method --pid_file string If set, the process will write its pid to the named file, and delete it on graceful shutdown. - --planner_version string + --planner-version string Sets the default planner to use when the session has not changed it. Valid values are: V3, Gen4, Gen4Greedy and Gen4Fallback. Gen4Fallback tries the gen4 planner and falls back to the V3 planner if the gen4 fails. (default gen4) + --planner_version string + Deprecated flag. Use planner-version instead --port int port for the server --pprof string diff --git a/go/test/endtoend/cluster/vtgate_process.go b/go/test/endtoend/cluster/vtgate_process.go index fb02faf3821..b63a8cb6f5f 100644 --- a/go/test/endtoend/cluster/vtgate_process.go +++ b/go/test/endtoend/cluster/vtgate_process.go @@ -87,7 +87,7 @@ func (vtgate *VtgateProcess) Setup() (err error) { "--mysql_auth_server_impl", vtgate.MySQLAuthServerImpl, } if vtgate.PlannerVersion > 0 { - args = append(args, "--planner_version", vtgate.PlannerVersion.String()) + args = append(args, "--planner-version", vtgate.PlannerVersion.String()) } if vtgate.SysVarSetEnabled { args = append(args, "--enable_system_settings") diff --git a/go/test/endtoend/tabletmanager/qps_test.go b/go/test/endtoend/tabletmanager/qps_test.go index fb086a899a4..f54021176e3 100644 --- a/go/test/endtoend/tabletmanager/qps_test.go +++ b/go/test/endtoend/tabletmanager/qps_test.go @@ -60,9 +60,7 @@ func TestQPS(t *testing.T) { // after that we'll see 0.0 QPS rates again. If this becomes actually // flaky, we need to read continuously in a separate thread. - n := 0 - for n < 15 { - n++ + for n := 0; n < 15; n++ { // Run queries via vtGate so that they are counted. utils.Exec(t, vtGateConn, "select * from t1") } diff --git a/go/test/endtoend/vtcombo/vttest_sample_test.go b/go/test/endtoend/vtcombo/vttest_sample_test.go index 4229ef0a87c..8e359b7849a 100644 --- a/go/test/endtoend/vtcombo/vttest_sample_test.go +++ b/go/test/endtoend/vtcombo/vttest_sample_test.go @@ -84,7 +84,7 @@ func TestMain(m *testing.M) { exitcode, err := func() (int, error) { var topology vttestpb.VTTestTopology - data := vttest.JsonTopoData(&topology) + data := vttest.JSONTopoData(&topology) err := data.Set(jsonTopo) if err != nil { return 1, err @@ -253,7 +253,7 @@ func assertTabletsPresent(t *testing.T) { } parts := strings.Split(line, " ") if parts[1] == "routed" { - numRouted += 1 + numRouted++ continue } diff --git a/go/test/endtoend/vtgate/partialfailure/main_test.go b/go/test/endtoend/vtgate/partialfailure/main_test.go index 8d1e2f82b96..103847dcb2f 100644 --- a/go/test/endtoend/vtgate/partialfailure/main_test.go +++ b/go/test/endtoend/vtgate/partialfailure/main_test.go @@ -124,7 +124,7 @@ func TestMain(m *testing.M) { } // Start vtgate - clusterInstance.VtGateExtraArgs = append(clusterInstance.VtGateExtraArgs, "--planner_version", "Gen4Fallback") + clusterInstance.VtGateExtraArgs = append(clusterInstance.VtGateExtraArgs, "--planner-version", "Gen4Fallback") if err := clusterInstance.StartVtgate(); err != nil { return 1 } diff --git a/go/vt/env/env.go b/go/vt/env/env.go index 70feb43186c..7bfe6719696 100644 --- a/go/vt/env/env.go +++ b/go/vt/env/env.go @@ -24,6 +24,8 @@ import ( "path" "path/filepath" "strings" + + "vitess.io/vitess/go/vt/log" ) const ( @@ -100,3 +102,19 @@ func VtMysqlBaseDir() (string, error) { } return root, nil } + +// CheckPlannerVersionFlag takes two string references and checks that just one +// has a value or that they agree with each other. +func CheckPlannerVersionFlag(correct, deprecated *string) (string, error) { + if deprecated != nil && *deprecated != "" { + if correct != nil && *deprecated != *correct { + return "", fmt.Errorf("can't specify planner-version and planner_version with different versions") + } + log.Warningf("planner_version is deprecated. please use planner-version instead") + return *deprecated, nil + } + if correct == nil { + return "", nil + } + return *correct, nil +} diff --git a/go/vt/vtexplain/vtexplain_vtgate.go b/go/vt/vtexplain/vtexplain_vtgate.go index 47421ec39f2..e78b31514e7 100644 --- a/go/vt/vtexplain/vtexplain_vtgate.go +++ b/go/vt/vtexplain/vtexplain_vtgate.go @@ -71,7 +71,7 @@ func (vte *VTExplain) initVtgateExecutor(vSchemaStr, ksShardMapStr string, opts streamSize := 10 var schemaTracker vtgate.SchemaInfo // no schema tracker for these tests - vte.vtgateExecutor = vtgate.NewExecutor(context.Background(), vte.explainTopo, vtexplainCell, resolver, opts.Normalize, false /*do not warn for sharded only*/, streamSize, cache.DefaultConfig, schemaTracker, false /*no-scatter*/) + vte.vtgateExecutor = vtgate.NewExecutor(context.Background(), vte.explainTopo, vtexplainCell, resolver, opts.Normalize, false, streamSize, cache.DefaultConfig, schemaTracker, false, opts.PlannerVersion) return nil } diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index 06f486196a5..3c52284898f 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -30,6 +30,8 @@ import ( "sync" "time" + "vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext" + "vitess.io/vitess/go/vt/vtgate/evalengine" "vitess.io/vitess/go/acl" @@ -89,6 +91,7 @@ type Executor struct { resolver *Resolver scatterConn *ScatterConn txConn *TxConn + pv plancontext.PlannerVersion mu sync.Mutex vschema *vindexes.VSchema @@ -113,7 +116,18 @@ const pathScatterStats = "/debug/scatter_stats" const pathVSchema = "/debug/vschema" // NewExecutor creates a new Executor. -func NewExecutor(ctx context.Context, serv srvtopo.Server, cell string, resolver *Resolver, normalize, warnOnShardedOnly bool, streamSize int, cacheCfg *cache.Config, schemaTracker SchemaInfo, noScatter bool) *Executor { +func NewExecutor( + ctx context.Context, + serv srvtopo.Server, + cell string, + resolver *Resolver, + normalize, warnOnShardedOnly bool, + streamSize int, + cacheCfg *cache.Config, + schemaTracker SchemaInfo, + noScatter bool, + pv plancontext.PlannerVersion, +) *Executor { e := &Executor{ serv: serv, cell: cell, @@ -126,6 +140,7 @@ func NewExecutor(ctx context.Context, serv srvtopo.Server, cell string, resolver streamSize: streamSize, schemaTracker: schemaTracker, allowScatter: !noScatter, + pv: pv, } vschemaacl.Init() @@ -1221,7 +1236,7 @@ func (e *Executor) prepare(ctx context.Context, safeSession *SafeSession, sql st func (e *Executor) handlePrepare(ctx context.Context, safeSession *SafeSession, sql string, bindVars map[string]*querypb.BindVariable, logStats *LogStats) ([]*querypb.Field, error) { // V3 mode. query, comments := sqlparser.SplitMarginComments(sql) - vcursor, _ := newVCursorImpl(ctx, safeSession, comments, e, logStats, e.vm, e.VSchema(), e.resolver.resolver, e.serv, e.warnShardedOnly) + vcursor, _ := newVCursorImpl(ctx, safeSession, comments, e, logStats, e.vm, e.VSchema(), e.resolver.resolver, e.serv, e.warnShardedOnly, e.pv) plan, err := e.getPlan( vcursor, query, diff --git a/go/vt/vtgate/executor_dml_test.go b/go/vt/vtgate/executor_dml_test.go index 987719ccb1c..7ef792f13e9 100644 --- a/go/vt/vtgate/executor_dml_test.go +++ b/go/vt/vtgate/executor_dml_test.go @@ -131,7 +131,7 @@ func TestUpdateEqual(t *testing.T) { func TestUpdateFromSubQuery(t *testing.T) { executor, sbc1, sbc2, _ := createExecutorEnv() - + executor.pv = querypb.ExecuteOptions_Gen4 logChan := QueryLogger.Subscribe("Test") defer QueryLogger.Unsubscribe(logChan) diff --git a/go/vt/vtgate/executor_framework_test.go b/go/vt/vtgate/executor_framework_test.go index c699ccc7a5a..448348ada8c 100644 --- a/go/vt/vtgate/executor_framework_test.go +++ b/go/vt/vtgate/executor_framework_test.go @@ -469,7 +469,7 @@ func createExecutorEnv() (executor *Executor, sbc1, sbc2, sbclookup *sandboxconn bad.VSchema = badVSchema getSandbox(KsTestUnsharded).VSchema = unshardedVSchema - executor = NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false) + executor = NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3) key.AnyShardPicker = DestinationAnyShardPickerFirstShard{} // create a new session each time so that ShardSessions don't get re-used across tests @@ -493,7 +493,7 @@ func createCustomExecutor(vschema string) (executor *Executor, sbc1, sbc2, sbclo sbclookup = hc.AddTestTablet(cell, "0", 1, KsTestUnsharded, "0", topodatapb.TabletType_PRIMARY, true, 1, nil) getSandbox(KsTestUnsharded).VSchema = unshardedVSchema - executor = NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false) + executor = NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3) // create a new session each time so that ShardSessions don't get re-used across tests primarySession = &vtgatepb.Session{ TargetString: "@primary", @@ -522,7 +522,7 @@ func createCustomExecutorSetValues(vschema string, values []*sqltypes.Result) (e sbclookup = hc.AddTestTablet(cell, "0", 1, KsTestUnsharded, "0", topodatapb.TabletType_PRIMARY, true, 1, nil) getSandbox(KsTestUnsharded).VSchema = unshardedVSchema - executor = NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false) + executor = NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3) // create a new session each time so that ShardSessions don't get re-used across tests primarySession = &vtgatepb.Session{ TargetString: "@primary", diff --git a/go/vt/vtgate/executor_select_test.go b/go/vt/vtgate/executor_select_test.go index 0c239e3c24b..33604c2718c 100644 --- a/go/vt/vtgate/executor_select_test.go +++ b/go/vt/vtgate/executor_select_test.go @@ -464,11 +464,7 @@ func TestCreateTableValidTimestamp(t *testing.T) { func TestGen4SelectDBA(t *testing.T) { executor, sbc1, _, _ := createExecutorEnv() executor.normalize = true - *plannerVersion = "gen4" - defer func() { - // change it back to v3 - *plannerVersion = "v3" - }() + executor.pv = querypb.ExecuteOptions_Gen4 query := "select * from INFORMATION_SCHEMA.foo" _, err := executor.Execute(context.Background(), "TestSelectDBA", @@ -1487,7 +1483,7 @@ func TestStreamSelectIN(t *testing.T) { } func createExecutor(serv *sandboxTopo, cell string, resolver *Resolver) *Executor { - return NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false) + return NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3) } func TestSelectScatter(t *testing.T) { @@ -2985,7 +2981,7 @@ func TestStreamOrderByLimitWithMultipleResults(t *testing.T) { count++ } - executor := NewExecutor(context.Background(), serv, cell, resolver, true, false, testBufferSize, cache.DefaultConfig, nil, false) + executor := NewExecutor(context.Background(), serv, cell, resolver, true, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3) before := runtime.NumGoroutine() query := "select id, col from user order by id limit 2" @@ -3048,12 +3044,7 @@ func TestSelectScatterFails(t *testing.T) { func TestGen4SelectStraightJoin(t *testing.T) { executor, sbc1, _, _ := createExecutorEnv() executor.normalize = true - *plannerVersion = "gen4" - defer func() { - // change it back to v3 - *plannerVersion = "v3" - }() - + executor.pv = querypb.ExecuteOptions_Gen4 session := NewSafeSession(&vtgatepb.Session{TargetString: "TestExecutor"}) query := "select u.id from user u straight_join user2 u2 on u.id = u2.id" _, err := executor.Execute(context.Background(), @@ -3081,11 +3072,7 @@ func TestGen4SelectStraightJoin(t *testing.T) { func TestGen4MultiColumnVindexEqual(t *testing.T) { executor, sbc1, sbc2, _ := createExecutorEnv() executor.normalize = true - *plannerVersion = "gen4" - defer func() { - // change it back to v3 - *plannerVersion = "v3" - }() + executor.pv = querypb.ExecuteOptions_Gen4 session := NewSafeSession(&vtgatepb.Session{TargetString: "TestExecutor"}) query := "select * from user_region where cola = 1 and colb = 2" @@ -3132,11 +3119,7 @@ func TestGen4MultiColumnVindexEqual(t *testing.T) { func TestGen4MultiColumnVindexIn(t *testing.T) { executor, sbc1, sbc2, _ := createExecutorEnv() executor.normalize = true - *plannerVersion = "gen4" - defer func() { - // change it back to v3 - *plannerVersion = "v3" - }() + executor.pv = querypb.ExecuteOptions_Gen4 session := NewSafeSession(&vtgatepb.Session{TargetString: "TestExecutor"}) query := "select * from user_region where cola IN (1,17984) and colb IN (2,3,4)" @@ -3179,11 +3162,7 @@ func TestGen4MultiColumnVindexIn(t *testing.T) { func TestGen4MultiColMixedColComparision(t *testing.T) { executor, sbc1, sbc2, _ := createExecutorEnv() executor.normalize = true - *plannerVersion = "gen4" - defer func() { - // change it back to v3 - *plannerVersion = "v3" - }() + executor.pv = querypb.ExecuteOptions_Gen4 session := NewSafeSession(&vtgatepb.Session{TargetString: "TestExecutor"}) query := "select * from user_region where colb = 2 and cola IN (1,17984)" @@ -3224,11 +3203,7 @@ func TestGen4MultiColMixedColComparision(t *testing.T) { func TestGen4MultiColBestVindexSel(t *testing.T) { executor, sbc1, sbc2, _ := createExecutorEnv() executor.normalize = true - *plannerVersion = "gen4" - defer func() { - // change it back to v3 - *plannerVersion = "v3" - }() + executor.pv = querypb.ExecuteOptions_Gen4 session := NewSafeSession(&vtgatepb.Session{TargetString: "TestExecutor"}) query := "select * from user_region where colb = 2 and cola IN (1,17984) and cola = 1" @@ -3282,11 +3257,7 @@ func TestGen4MultiColBestVindexSel(t *testing.T) { func TestGen4MultiColMultiEqual(t *testing.T) { executor, sbc1, sbc2, _ := createExecutorEnv() executor.normalize = true - *plannerVersion = "gen4" - defer func() { - // change it back to v3 - *plannerVersion = "v3" - }() + executor.pv = querypb.ExecuteOptions_Gen4 session := NewSafeSession(&vtgatepb.Session{TargetString: "TestExecutor"}) query := "select * from user_region where (cola,colb) in ((17984,2),(17984,3))" @@ -3313,10 +3284,6 @@ func TestGen4MultiColMultiEqual(t *testing.T) { func TestRegionRange(t *testing.T) { // Special setup: Don't use createExecutorEnv. - *plannerVersion = "gen4" - defer func() { - *plannerVersion = "v3" - }() cell := "regioncell" ks := "TestExecutor" hc := discovery.NewFakeHealthCheck(nil) @@ -3332,6 +3299,7 @@ func TestRegionRange(t *testing.T) { conns = append(conns, sbc) } executor := createExecutor(serv, cell, resolver) + executor.pv = querypb.ExecuteOptions_Gen4 tcases := []struct { regionID int @@ -3368,8 +3336,6 @@ func TestRegionRange(t *testing.T) { func TestMultiCol(t *testing.T) { // Special setup: Don't use createLegacyExecutorEnv. - - *plannerVersion = "gen4" cell := "multicol" ks := "TestMultiCol" hc := discovery.NewFakeHealthCheck(nil) @@ -3385,6 +3351,7 @@ func TestMultiCol(t *testing.T) { conns = append(conns, sbc) } executor := createExecutor(serv, cell, resolver) + executor.pv = querypb.ExecuteOptions_Gen4 tcases := []struct { cola, colb, colc int @@ -3448,7 +3415,6 @@ var multiColVschema = ` func TestMultiColPartial(t *testing.T) { // Special setup: Don't use createLegacyExecutorEnv. - *plannerVersion = "gen4" cell := "multicol" ks := "TestMultiCol" hc := discovery.NewFakeHealthCheck(nil) @@ -3464,6 +3430,7 @@ func TestMultiColPartial(t *testing.T) { conns = append(conns, sbc) } executor := createExecutor(serv, cell, resolver) + executor.pv = querypb.ExecuteOptions_Gen4 tcases := []struct { where string @@ -3510,7 +3477,6 @@ func TestMultiColPartial(t *testing.T) { func TestSelectAggregationNoData(t *testing.T) { // Special setup: Don't use createExecutorEnv. - *plannerVersion = "gen4" cell := "aa" hc := discovery.NewFakeHealthCheck(nil) createSandbox(KsTestSharded).VSchema = executorVSchema @@ -3524,6 +3490,7 @@ func TestSelectAggregationNoData(t *testing.T) { conns = append(conns, sbc) } executor := createExecutor(serv, cell, resolver) + executor.pv = querypb.ExecuteOptions_Gen4 tcases := []struct { sql string @@ -3594,7 +3561,6 @@ func TestSelectAggregationNoData(t *testing.T) { func TestSelectAggregationData(t *testing.T) { // Special setup: Don't use createExecutorEnv. - *plannerVersion = "gen4" cell := "aa" hc := discovery.NewFakeHealthCheck(nil) createSandbox(KsTestSharded).VSchema = executorVSchema @@ -3608,6 +3574,7 @@ func TestSelectAggregationData(t *testing.T) { conns = append(conns, sbc) } executor := createExecutor(serv, cell, resolver) + executor.pv = querypb.ExecuteOptions_Gen4 tcases := []struct { sql string diff --git a/go/vt/vtgate/executor_stream_test.go b/go/vt/vtgate/executor_stream_test.go index abce17b242c..8fea4ed985f 100644 --- a/go/vt/vtgate/executor_stream_test.go +++ b/go/vt/vtgate/executor_stream_test.go @@ -20,6 +20,8 @@ import ( "testing" "time" + querypb "vitess.io/vitess/go/vt/proto/query" + "vitess.io/vitess/go/cache" "vitess.io/vitess/go/vt/discovery" topodatapb "vitess.io/vitess/go/vt/proto/topodata" @@ -59,7 +61,7 @@ func TestStreamSQLSharded(t *testing.T) { for _, shard := range shards { _ = hc.AddTestTablet(cell, shard, 1, "TestExecutor", shard, topodatapb.TabletType_PRIMARY, true, 1, nil) } - executor := NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false) + executor := NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3) sql := "stream * from sharded_user_msgs" result, err := executorStreamMessages(executor, sql) diff --git a/go/vt/vtgate/executor_test.go b/go/vt/vtgate/executor_test.go index 1d15704b04d..f2af6566297 100644 --- a/go/vt/vtgate/executor_test.go +++ b/go/vt/vtgate/executor_test.go @@ -1568,10 +1568,12 @@ func TestVSchemaStats(t *testing.T) { } } +var pv = querypb.ExecuteOptions_Gen4 + func TestGetPlanUnnormalized(t *testing.T) { r, _, _, _ := createExecutorEnv() - emptyvc, _ := newVCursorImpl(ctx, NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false) - unshardedvc, _ := newVCursorImpl(ctx, NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false) + emptyvc, _ := newVCursorImpl(ctx, NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv) + unshardedvc, _ := newVCursorImpl(ctx, NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv) query1 := "select * from music_user_map where id = 1" plan1, logStats1 := getPlanCached(t, r, emptyvc, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false) @@ -1647,7 +1649,7 @@ func getPlanCached(t *testing.T, e *Executor, vcursor *vcursorImpl, sql string, func TestGetPlanCacheUnnormalized(t *testing.T) { r, _, _, _ := createExecutorEnv() - emptyvc, _ := newVCursorImpl(ctx, NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false) + emptyvc, _ := newVCursorImpl(ctx, NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv) query1 := "select * from music_user_map where id = 1" _, logStats1 := getPlanCached(t, r, emptyvc, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, true) @@ -1668,7 +1670,7 @@ func TestGetPlanCacheUnnormalized(t *testing.T) { // Skip cache using directive r, _, _, _ = createExecutorEnv() - unshardedvc, _ := newVCursorImpl(ctx, NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false) + unshardedvc, _ := newVCursorImpl(ctx, NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv) query1 = "insert /*vt+ SKIP_QUERY_PLAN_CACHE=1 */ into user(id) values (1), (2)" getPlanCached(t, r, unshardedvc, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false) @@ -1679,12 +1681,12 @@ func TestGetPlanCacheUnnormalized(t *testing.T) { assertCacheSize(t, r.plans, 1) // the target string will be resolved and become part of the plan cache key, which adds a new entry - ksIDVc1, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[deadbeef]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false) + ksIDVc1, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[deadbeef]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv) getPlanCached(t, r, ksIDVc1, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false) assertCacheSize(t, r.plans, 2) // the target string will be resolved and become part of the plan cache key, as it's an unsharded ks, it will be the same entry as above - ksIDVc2, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[beefdead]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false) + ksIDVc2, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[beefdead]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv) getPlanCached(t, r, ksIDVc2, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false) assertCacheSize(t, r.plans, 2) } @@ -1692,7 +1694,7 @@ func TestGetPlanCacheUnnormalized(t *testing.T) { func TestGetPlanCacheNormalized(t *testing.T) { r, _, _, _ := createExecutorEnv() r.normalize = true - emptyvc, _ := newVCursorImpl(ctx, NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false) + emptyvc, _ := newVCursorImpl(ctx, NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv) query1 := "select * from music_user_map where id = 1" _, logStats1 := getPlanCached(t, r, emptyvc, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, true /* skipQueryPlanCache */) @@ -1711,7 +1713,7 @@ func TestGetPlanCacheNormalized(t *testing.T) { // Skip cache using directive r, _, _, _ = createExecutorEnv() r.normalize = true - unshardedvc, _ := newVCursorImpl(ctx, NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false) + unshardedvc, _ := newVCursorImpl(ctx, NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv) query1 = "insert /*vt+ SKIP_QUERY_PLAN_CACHE=1 */ into user(id) values (1), (2)" getPlanCached(t, r, unshardedvc, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false) @@ -1722,12 +1724,12 @@ func TestGetPlanCacheNormalized(t *testing.T) { assertCacheSize(t, r.plans, 1) // the target string will be resolved and become part of the plan cache key, which adds a new entry - ksIDVc1, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[deadbeef]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false) + ksIDVc1, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[deadbeef]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv) getPlanCached(t, r, ksIDVc1, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false) assertCacheSize(t, r.plans, 2) // the target string will be resolved and become part of the plan cache key, as it's an unsharded ks, it will be the same entry as above - ksIDVc2, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[beefdead]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false) + ksIDVc2, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[beefdead]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv) getPlanCached(t, r, ksIDVc2, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false) assertCacheSize(t, r.plans, 2) } @@ -1735,8 +1737,8 @@ func TestGetPlanCacheNormalized(t *testing.T) { func TestGetPlanNormalized(t *testing.T) { r, _, _, _ := createExecutorEnv() r.normalize = true - emptyvc, _ := newVCursorImpl(ctx, NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false) - unshardedvc, _ := newVCursorImpl(ctx, NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false) + emptyvc, _ := newVCursorImpl(ctx, NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv) + unshardedvc, _ := newVCursorImpl(ctx, NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv) query1 := "select * from music_user_map where id = 1" query2 := "select * from music_user_map where id = 2" diff --git a/go/vt/vtgate/plan_execute.go b/go/vt/vtgate/plan_execute.go index 6a15b982a67..6e9cc64703c 100644 --- a/go/vt/vtgate/plan_execute.go +++ b/go/vt/vtgate/plan_execute.go @@ -53,7 +53,7 @@ func (e *Executor) newExecute( } query, comments := sqlparser.SplitMarginComments(sql) - vcursor, err := newVCursorImpl(ctx, safeSession, comments, e, logStats, e.vm, e.VSchema(), e.resolver.resolver, e.serv, e.warnShardedOnly) + vcursor, err := newVCursorImpl(ctx, safeSession, comments, e, logStats, e.vm, e.VSchema(), e.resolver.resolver, e.serv, e.warnShardedOnly, e.pv) if err != nil { return err } diff --git a/go/vt/vtgate/vcursor_impl.go b/go/vt/vtgate/vcursor_impl.go index ae4bb1a7d03..90dadb232dc 100644 --- a/go/vt/vtgate/vcursor_impl.go +++ b/go/vt/vtgate/vcursor_impl.go @@ -36,7 +36,6 @@ import ( "vitess.io/vitess/go/vt/callerid" "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/key" - "vitess.io/vitess/go/vt/log" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" @@ -51,7 +50,6 @@ import ( "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vtgate/buffer" "vitess.io/vitess/go/vt/vtgate/engine" - "vitess.io/vitess/go/vt/vtgate/planbuilder" "vitess.io/vitess/go/vt/vtgate/semantics" "vitess.io/vitess/go/vt/vtgate/vindexes" "vitess.io/vitess/go/vt/vtgate/vschemaacl" @@ -113,6 +111,7 @@ type vcursorImpl struct { warnShardedOnly bool // when using sharded only features, a warning will be warnings field warnings []*querypb.QueryWarning // any warnings that are accumulated during the planning phase are stored here + pv plancontext.PlannerVersion } // newVcursorImpl creates a vcursorImpl. Before creating this object, you have to separate out any marginComments that came with @@ -130,6 +129,7 @@ func newVCursorImpl( resolver *srvtopo.Resolver, serv srvtopo.Server, warnShardedOnly bool, + pv plancontext.PlannerVersion, ) (*vcursorImpl, error) { keyspace, tabletType, destination, err := parseDestinationTarget(safeSession.TargetString, vschema) if err != nil { @@ -175,6 +175,7 @@ func newVCursorImpl( vm: vm, topoServer: ts, warnShardedOnly: warnShardedOnly, + pv: pv, }, nil } @@ -413,13 +414,7 @@ func (vc *vcursorImpl) Planner() plancontext.PlannerVersion { vc.safeSession.Options.PlannerVersion != querypb.ExecuteOptions_DEFAULT_PLANNER { return vc.safeSession.Options.PlannerVersion } - version, done := plancontext.PlannerNameToVersion(*plannerVersion) - if done { - return version - } - - log.Warning("unknown planner version configured. using the default") - return planbuilder.V3 + return vc.pv } // GetSemTable implements the ContextVSchema interface diff --git a/go/vt/vtgate/vcursor_impl_test.go b/go/vt/vtgate/vcursor_impl_test.go index 3a0259aab4a..46ca89f763a 100644 --- a/go/vt/vtgate/vcursor_impl_test.go +++ b/go/vt/vtgate/vcursor_impl_test.go @@ -7,6 +7,8 @@ import ( "strconv" "testing" + querypb "vitess.io/vitess/go/vt/proto/query" + "vitess.io/vitess/go/vt/proto/vschema" vschemapb "vitess.io/vitess/go/vt/proto/vschema" "vitess.io/vitess/go/vt/srvtopo" @@ -186,7 +188,7 @@ func TestDestinationKeyspace(t *testing.T) { for i, tc := range tests { t.Run(strconv.Itoa(i)+tc.targetString, func(t *testing.T) { - impl, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: tc.targetString}), sqlparser.MarginComments{}, nil, nil, &fakeVSchemaOperator{vschema: tc.vschema}, tc.vschema, nil, nil, false) + impl, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: tc.targetString}), sqlparser.MarginComments{}, nil, nil, &fakeVSchemaOperator{vschema: tc.vschema}, tc.vschema, nil, nil, false, querypb.ExecuteOptions_Gen4) impl.vschema = tc.vschema dest, keyspace, tabletType, err := impl.TargetDestination(tc.qualifier) if tc.expectedError == "" { @@ -244,7 +246,7 @@ func TestSetTarget(t *testing.T) { for i, tc := range tests { t.Run(fmt.Sprintf("%d#%s", i, tc.targetString), func(t *testing.T) { - vc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{InTransaction: true}), sqlparser.MarginComments{}, nil, nil, &fakeVSchemaOperator{vschema: tc.vschema}, tc.vschema, nil, nil, false) + vc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{InTransaction: true}), sqlparser.MarginComments{}, nil, nil, &fakeVSchemaOperator{vschema: tc.vschema}, tc.vschema, nil, nil, false, querypb.ExecuteOptions_Gen4) vc.vschema = tc.vschema err := vc.SetTarget(tc.targetString) if tc.expectedError == "" { @@ -286,7 +288,7 @@ func TestPlanPrefixKey(t *testing.T) { t.Run(fmt.Sprintf("%d#%s", i, tc.targetString), func(t *testing.T) { ss := NewSafeSession(&vtgatepb.Session{InTransaction: false}) ss.SetTargetString(tc.targetString) - vc, err := newVCursorImpl(context.Background(), ss, sqlparser.MarginComments{}, nil, nil, &fakeVSchemaOperator{vschema: tc.vschema}, tc.vschema, srvtopo.NewResolver(&fakeTopoServer{}, nil, ""), nil, false) + vc, err := newVCursorImpl(context.Background(), ss, sqlparser.MarginComments{}, nil, nil, &fakeVSchemaOperator{vschema: tc.vschema}, tc.vschema, srvtopo.NewResolver(&fakeTopoServer{}, nil, ""), nil, false, querypb.ExecuteOptions_Gen4) require.NoError(t, err) vc.vschema = tc.vschema require.Equal(t, tc.expectedPlanPrefixKey, vc.planPrefixKey()) @@ -305,7 +307,7 @@ func TestFirstSortedKeyspace(t *testing.T) { ks3Schema.Keyspace.Name: ks3Schema, }} - vc, err := newVCursorImpl(context.Background(), NewSafeSession(nil), sqlparser.MarginComments{}, nil, nil, &fakeVSchemaOperator{vschema: vschemaWith2KS}, vschemaWith2KS, srvtopo.NewResolver(&fakeTopoServer{}, nil, ""), nil, false) + vc, err := newVCursorImpl(context.Background(), NewSafeSession(nil), sqlparser.MarginComments{}, nil, nil, &fakeVSchemaOperator{vschema: vschemaWith2KS}, vschemaWith2KS, srvtopo.NewResolver(&fakeTopoServer{}, nil, ""), nil, false, querypb.ExecuteOptions_Gen4) require.NoError(t, err) ks, err := vc.FirstSortedKeyspace() require.NoError(t, err) diff --git a/go/vt/vtgate/vtgate.go b/go/vt/vtgate/vtgate.go index dd51eaf971a..f45de1d4f8f 100644 --- a/go/vt/vtgate/vtgate.go +++ b/go/vt/vtgate/vtgate.go @@ -26,6 +26,8 @@ import ( "strings" "time" + "vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext" + "vitess.io/vitess/go/vt/key" "context" @@ -82,7 +84,6 @@ var ( // Put set-passthrough under a flag. sysVarSetEnabled = flag.Bool("enable_system_settings", true, "This will enable the system settings to be changed per session at the database connection level") setVarEnabled = flag.Bool("enable_set_var", true, "This will enable the use of MySQL's SET_VAR query hint for certain system variables instead of using reserved connections") - plannerVersion = flag.String("planner_version", "gen4", "Sets the default planner to use when the session has not changed it. Valid values are: V3, Gen4, Gen4Greedy and Gen4Fallback. Gen4Fallback tries the gen4 planner and falls back to the V3 planner if the gen4 fails.") // lockHeartbeatTime is used to set the next heartbeat time. lockHeartbeatTime = flag.Duration("lock_heartbeat_time", 5*time.Second, "If there is lock function used. This will keep the lock connection active by using this heartbeat") @@ -140,6 +141,7 @@ type VTGate struct { vsm *vstreamManager txConn *TxConn gw *TabletGateway + pv plancontext.PlannerVersion // stats objects. // TODO(sougou): This needs to be cleaned up. There @@ -160,7 +162,14 @@ type RegisterVTGate func(vtgateservice.VTGateService) var RegisterVTGates []RegisterVTGate // Init initializes VTGate server. -func Init(ctx context.Context, hc discovery.HealthCheck, serv srvtopo.Server, cell string, tabletTypesToWait []topodatapb.TabletType) *VTGate { +func Init( + ctx context.Context, + hc discovery.HealthCheck, + serv srvtopo.Server, + cell string, + tabletTypesToWait []topodatapb.TabletType, + pv plancontext.PlannerVersion, +) *VTGate { if rpcVTGate != nil { log.Fatalf("VTGate already initialized") } @@ -228,6 +237,7 @@ func Init(ctx context.Context, hc discovery.HealthCheck, serv srvtopo.Server, ce cacheCfg, si, *noScatter, + pv, ) // connect the schema tracker with the vschema manager diff --git a/go/vt/vtgate/vtgate_test.go b/go/vt/vtgate/vtgate_test.go index f68b7fda716..877dd75cccc 100644 --- a/go/vt/vtgate/vtgate_test.go +++ b/go/vt/vtgate/vtgate_test.go @@ -75,7 +75,7 @@ func init() { ` hcVTGateTest = discovery.NewFakeHealthCheck(nil) *transactionMode = "MULTI" - Init(context.Background(), hcVTGateTest, new(sandboxTopo), "aa", nil) + Init(context.Background(), hcVTGateTest, new(sandboxTopo), "aa", nil, querypb.ExecuteOptions_Gen4) *mysqlServerPort = 0 *mysqlAuthServerImpl = "none" diff --git a/go/vt/vttest/local_cluster.go b/go/vt/vttest/local_cluster.go index 3bf63d76a92..484acc8c3e7 100644 --- a/go/vt/vttest/local_cluster.go +++ b/go/vt/vttest/local_cluster.go @@ -89,6 +89,9 @@ type Config struct { // Choose between V3, Gen4, Gen4Greedy and Gen4Fallback PlannerVersion string + // PlannerVersionDeprecated is deprecated and should not be used + PlannerVersionDeprecated string + // ExtraMyCnf are the extra .CNF files to be added to the MySQL config ExtraMyCnf []string @@ -223,7 +226,7 @@ func TextTopoData(tpb *vttestpb.VTTestTopology) flag.Value { } } -func JsonTopoData(tpb *vttestpb.VTTestTopology) flag.Value { +func JSONTopoData(tpb *vttestpb.VTTestTopology) flag.Value { return &TopoData{ vtTestTopology: tpb, unmarshal: protojson.Unmarshal, @@ -336,7 +339,7 @@ func (db *LocalCluster) Setup() error { if !db.OnlyMySQL { log.Infof("Starting vtcombo...") - db.vt = VtcomboProcess(db.Env, &db.Config, db.mysql) + db.vt, _ = VtcomboProcess(db.Env, &db.Config, db.mysql) if err := db.vt.WaitStart(); err != nil { return err } diff --git a/go/vt/vttest/vtprocess.go b/go/vt/vttest/vtprocess.go index 1344c6ead41..45b8d47f6bd 100644 --- a/go/vt/vttest/vtprocess.go +++ b/go/vt/vttest/vtprocess.go @@ -27,6 +27,8 @@ import ( "syscall" "time" + "vitess.io/vitess/go/vt/env" + "google.golang.org/protobuf/encoding/prototext" "vitess.io/vitess/go/vt/log" @@ -190,16 +192,16 @@ var QueryServerArgs = []string{ // VtcomboProcess returns a VtProcess handle for a local `vtcombo` service, // configured with the given Config. // The process must be manually started by calling WaitStart() -func VtcomboProcess(env Environment, args *Config, mysql MySQLManager) *VtProcess { +func VtcomboProcess(environment Environment, args *Config, mysql MySQLManager) (*VtProcess, error) { vt := &VtProcess{ Name: "vtcombo", - Directory: env.Directory(), - LogDirectory: env.LogDirectory(), - Binary: env.BinaryPath("vtcombo"), - Port: env.PortForProtocol("vtcombo", ""), - PortGrpc: env.PortForProtocol("vtcombo", "grpc"), - HealthCheck: env.ProcessHealthCheck("vtcombo"), - Env: env.EnvVars(), + Directory: environment.Directory(), + LogDirectory: environment.LogDirectory(), + Binary: environment.BinaryPath("vtcombo"), + Port: environment.PortForProtocol("vtcombo", ""), + PortGrpc: environment.PortForProtocol("vtcombo", "grpc"), + HealthCheck: environment.ProcessHealthCheck("vtcombo"), + Env: environment.EnvVars(), } user, pass := mysql.Auth() @@ -208,6 +210,10 @@ func VtcomboProcess(env Environment, args *Config, mysql MySQLManager) *VtProces if charset == "" { charset = DefaultCharset } + verStr, err := env.CheckPlannerVersionFlag(&args.PlannerVersion, &args.PlannerVersionDeprecated) + if err != nil { + return nil, err + } protoTopo, _ := prototext.Marshal(args.Topology) vt.ExtraArgs = append(vt.ExtraArgs, []string{ @@ -223,14 +229,14 @@ func VtcomboProcess(env Environment, args *Config, mysql MySQLManager) *VtProces "--enable_query_plan_field_caching=false", "--dbddl_plugin", "vttest", "--foreign_key_mode", args.ForeignKeyMode, - "--planner_version", args.PlannerVersion, + "--planner-version", verStr, fmt.Sprintf("--enable_online_ddl=%t", args.EnableOnlineDDL), fmt.Sprintf("--enable_direct_ddl=%t", args.EnableDirectDDL), fmt.Sprintf("--enable_system_settings=%t", args.EnableSystemSettings), }...) vt.ExtraArgs = append(vt.ExtraArgs, QueryServerArgs...) - vt.ExtraArgs = append(vt.ExtraArgs, env.VtcomboArguments()...) + vt.ExtraArgs = append(vt.ExtraArgs, environment.VtcomboArguments()...) if args.SchemaDir != "" { vt.ExtraArgs = append(vt.ExtraArgs, []string{"--schema_dir", args.SchemaDir}...) @@ -271,7 +277,7 @@ func VtcomboProcess(env Environment, args *Config, mysql MySQLManager) *VtProces }...) } - vtcomboMysqlPort := env.PortForProtocol("vtcombo_mysql_port", "") + vtcomboMysqlPort := environment.PortForProtocol("vtcombo_mysql_port", "") vtcomboMysqlBindAddress := "localhost" if args.MySQLBindHost != "" { vtcomboMysqlBindAddress = args.MySQLBindHost @@ -292,5 +298,5 @@ func VtcomboProcess(env Environment, args *Config, mysql MySQLManager) *VtProces }...) } - return vt + return vt, nil }