diff --git a/go/test/endtoend/tabletmanager/tablet_test.go b/go/test/endtoend/tabletmanager/tablet_test.go index 8277ee13aed..8e6a1ad5a5e 100644 --- a/go/test/endtoend/tabletmanager/tablet_test.go +++ b/go/test/endtoend/tabletmanager/tablet_test.go @@ -20,12 +20,40 @@ import ( "fmt" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/vt/log" ) +// TestEnsureDB tests that vttablet creates the db as needed +func TestEnsureDB(t *testing.T) { + defer cluster.PanicHandler(t) + + // Create new tablet + tablet := clusterInstance.NewVttabletInstance("replica", 0, "") + tablet.MysqlctlProcess = *cluster.MysqlCtlProcessInstance(tablet.TabletUID, tablet.MySQLPort, clusterInstance.TmpDirectory) + err := tablet.MysqlctlProcess.Start() + require.NoError(t, err) + + log.Info(fmt.Sprintf("Started vttablet %v", tablet)) + // Start vttablet process as replica. It won't be able to serve because there's no db. + err = clusterInstance.StartVttablet(tablet, "NOT_SERVING", false, cell, "dbtest", hostname, "0") + require.NoError(t, err) + + // Make it the master. + err = clusterInstance.VtctlclientProcess.ExecuteCommand("TabletExternallyReparented", tablet.Alias) + require.NoError(t, err) + + // It goes SERVING because TER calls ChangeTabletType which will also set the database to read-write + assert.Equal(t, "SERVING", tablet.VttabletProcess.GetTabletStatus()) + status := tablet.VttabletProcess.GetStatusDetails() + assert.Contains(t, status, "Serving") + + killTablets(t, tablet) +} + // TestLocalMetadata tests the contents of local_metadata table after vttablet startup func TestLocalMetadata(t *testing.T) { defer cluster.PanicHandler(t) diff --git a/go/test/endtoend/vreplication/config.go b/go/test/endtoend/vreplication/config.go index c22a486e39c..d539252d5d0 100644 --- a/go/test/endtoend/vreplication/config.go +++ b/go/test/endtoend/vreplication/config.go @@ -4,7 +4,7 @@ var ( initialProductSchema = ` create table product(pid int, description varbinary(128), primary key(pid)); create table customer(cid int, name varbinary(128), primary key(cid)); -create table merchant(mname varchar(128), category varchar(128), primary key(mname)); +create table merchant(mname varchar(128), category varchar(128), primary key(mname)) DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; create table orders(oid int, cid int, pid int, mname varchar(128), price int, primary key(oid)); create table customer_seq(id int, next_id bigint, cache bigint, primary key(id)) comment 'vitess_sequence'; create table order_seq(id int, next_id bigint, cache bigint, primary key(id)) comment 'vitess_sequence'; diff --git a/go/test/endtoend/vtgate/setstatement/sysvar_test.go b/go/test/endtoend/vtgate/setstatement/sysvar_test.go index 0eccc545ec6..424e2f65e3f 100644 --- a/go/test/endtoend/vtgate/setstatement/sysvar_test.go +++ b/go/test/endtoend/vtgate/setstatement/sysvar_test.go @@ -192,6 +192,36 @@ func TestSetSystemVariableAndThenSuccessfulTx(t *testing.T) { assertMatches(t, conn, "select @@sql_safe_updates", "[[INT64(1)]]") } +func TestSetSystemVariableAndThenSuccessfulAutocommitDML(t *testing.T) { + vtParams := mysql.ConnParams{ + Host: "localhost", + Port: clusterInstance.VtgateMySQLPort, + } + + conn, err := mysql.Connect(context.Background(), &vtParams) + require.NoError(t, err) + defer conn.Close() + checkedExec(t, conn, `delete from test`) + + checkedExec(t, conn, `set sql_safe_updates = 1`) + + checkedExec(t, conn, `insert into test (id, val1) values (80, null)`) + assertMatches(t, conn, `select id, val1 from test`, `[[INT64(80) NULL]]`) + assertMatches(t, conn, `select @@sql_safe_updates`, `[[INT64(1)]]`) + + checkedExec(t, conn, `update test set val2 = 2 where val1 is null`) + assertMatches(t, conn, `select id, val1, val2 from test`, `[[INT64(80) NULL INT32(2)]]`) + assertMatches(t, conn, `select @@sql_safe_updates`, `[[INT64(1)]]`) + + checkedExec(t, conn, `update test set val1 = 'text' where val1 is null`) + assertMatches(t, conn, `select id, val1, val2 from test`, `[[INT64(80) VARCHAR("text") INT32(2)]]`) + assertMatches(t, conn, `select @@sql_safe_updates`, `[[INT64(1)]]`) + + checkedExec(t, conn, `delete from test where val1 = 'text'`) + assertMatches(t, conn, `select id, val1, val2 from test`, `[]`) + assertMatches(t, conn, `select @@sql_safe_updates`, `[[INT64(1)]]`) +} + func TestStartTxAndSetSystemVariableAndThenSuccessfulCommit(t *testing.T) { vtParams := mysql.ConnParams{ Host: "localhost", diff --git a/go/vt/dbconfigs/dbconfigs.go b/go/vt/dbconfigs/dbconfigs.go index efabbe9d5e8..2c5d5441531 100644 --- a/go/vt/dbconfigs/dbconfigs.go +++ b/go/vt/dbconfigs/dbconfigs.go @@ -223,6 +223,11 @@ func (dbcfgs *DBConfigs) AppDebugWithDB() Connector { return dbcfgs.makeParams(&dbcfgs.appdebugParams, true) } +// AllPrivsConnector returns connection parameters for appdebug with no dbname set. +func (dbcfgs *DBConfigs) AllPrivsConnector() Connector { + return dbcfgs.makeParams(&dbcfgs.allprivsParams, false) +} + // AllPrivsWithDB returns connection parameters for appdebug with dbname set. func (dbcfgs *DBConfigs) AllPrivsWithDB() Connector { return dbcfgs.makeParams(&dbcfgs.allprivsParams, true) diff --git a/go/vt/dbconfigs/dbconfigs_test.go b/go/vt/dbconfigs/dbconfigs_test.go index 57f5b695b45..04acb3f9c15 100644 --- a/go/vt/dbconfigs/dbconfigs_test.go +++ b/go/vt/dbconfigs/dbconfigs_test.go @@ -227,6 +227,9 @@ func TestAccessors(t *testing.T) { if got, want := dbc.AppWithDB().connParams.DbName, "db"; got != want { t.Errorf("dbc.AppWithDB().DbName: %v, want %v", got, want) } + if got, want := dbc.AllPrivsConnector().connParams.DbName, ""; got != want { + t.Errorf("dbc.AllPrivsWithDB().DbName: %v, want %v", got, want) + } if got, want := dbc.AllPrivsWithDB().connParams.DbName, "db"; got != want { t.Errorf("dbc.AllPrivsWithDB().DbName: %v, want %v", got, want) } diff --git a/go/vt/sqlparser/ast.go b/go/vt/sqlparser/ast.go index 60a699d993a..67dc784275f 100644 --- a/go/vt/sqlparser/ast.go +++ b/go/vt/sqlparser/ast.go @@ -1537,17 +1537,17 @@ func (node Exprs) Format(buf *TrackedBuffer) { // Format formats the node. func (node *AndExpr) Format(buf *TrackedBuffer) { - buf.astPrintf(node, "%v and %v", node.Left, node.Right) + buf.astPrintf(node, "%l and %r", node.Left, node.Right) } // Format formats the node. func (node *OrExpr) Format(buf *TrackedBuffer) { - buf.astPrintf(node, "%v or %v", node.Left, node.Right) + buf.astPrintf(node, "%l or %r", node.Left, node.Right) } // Format formats the node. func (node *XorExpr) Format(buf *TrackedBuffer) { - buf.astPrintf(node, "%v xor %v", node.Left, node.Right) + buf.astPrintf(node, "%l xor %r", node.Left, node.Right) } // Format formats the node. @@ -1557,7 +1557,7 @@ func (node *NotExpr) Format(buf *TrackedBuffer) { // Format formats the node. func (node *ComparisonExpr) Format(buf *TrackedBuffer) { - buf.astPrintf(node, "%v %s %v", node.Left, node.Operator, node.Right) + buf.astPrintf(node, "%l %s %r", node.Left, node.Operator, node.Right) if node.Escape != nil { buf.astPrintf(node, " escape %v", node.Escape) } @@ -1565,7 +1565,7 @@ func (node *ComparisonExpr) Format(buf *TrackedBuffer) { // Format formats the node. func (node *RangeCond) Format(buf *TrackedBuffer) { - buf.astPrintf(node, "%v %s %v and %v", node.Left, node.Operator, node.From, node.To) + buf.astPrintf(node, "%v %s %l and %r", node.Left, node.Operator, node.From, node.To) } // Format formats the node. @@ -1635,7 +1635,7 @@ func (node ListArg) Format(buf *TrackedBuffer) { // Format formats the node. func (node *BinaryExpr) Format(buf *TrackedBuffer) { - buf.astPrintf(node, "%v %s %v", node.Left, node.Operator, node.Right) + buf.astPrintf(node, "%l %s %r", node.Left, node.Operator, node.Right) } // Format formats the node. diff --git a/go/vt/sqlparser/precedence_test.go b/go/vt/sqlparser/precedence_test.go index c0676dee9be..a74c44f4e12 100644 --- a/go/vt/sqlparser/precedence_test.go +++ b/go/vt/sqlparser/precedence_test.go @@ -142,6 +142,17 @@ func TestParens(t *testing.T) { {in: "(a | b) between (5) and (7)", expected: "a | b between 5 and 7"}, {in: "(a and b) between (5) and (7)", expected: "(a and b) between 5 and 7"}, {in: "(true is true) is null", expected: "(true is true) is null"}, + {in: "3 * (100 div 3)", expected: "3 * (100 div 3)"}, + {in: "100 div 2 div 2", expected: "100 div 2 div 2"}, + {in: "100 div (2 div 2)", expected: "100 div (2 div 2)"}, + {in: "(100 div 2) div 2", expected: "100 div 2 div 2"}, + {in: "((((((1000))))))", expected: "1000"}, + {in: "100 - (50 + 10)", expected: "100 - (50 + 10)"}, + {in: "100 - 50 + 10", expected: "100 - 50 + 10"}, + {in: "true and (true and true)", expected: "true and (true and true)"}, + {in: "10 - 2 - 1", expected: "10 - 2 - 1"}, + {in: "(10 - 2) - 1", expected: "10 - 2 - 1"}, + {in: "10 - (2 - 1)", expected: "10 - (2 - 1)"}, } for _, tc := range tests { diff --git a/go/vt/sqlparser/tracked_buffer.go b/go/vt/sqlparser/tracked_buffer.go index 8f5fef1f689..a8230360139 100644 --- a/go/vt/sqlparser/tracked_buffer.go +++ b/go/vt/sqlparser/tracked_buffer.go @@ -56,7 +56,9 @@ func (buf *TrackedBuffer) WriteNode(node SQLNode) *TrackedBuffer { // Myprintf mimics fmt.Fprintf(buf, ...), but limited to Node(%v), // Node.Value(%s) and string(%s). It also allows a %a for a value argument, in // which case it adds tracking info for future substitutions. -// It adds parens as needed to follow precedence rules when printing expressions +// It adds parens as needed to follow precedence rules when printing expressions. +// To handle parens correctly for left associative binary operators, +// use %l and %r to tell the TrackedBuffer which value is on the LHS and RHS // // The name must be something other than the usual Printf() to avoid "go vet" // warnings due to our custom format specifiers. @@ -87,7 +89,8 @@ func (buf *TrackedBuffer) astPrintf(currentNode SQLNode, format string, values . break } i++ // '%' - switch format[i] { + token := format[i] + switch token { case 'c': switch v := values[fieldnum].(type) { case byte: @@ -106,19 +109,19 @@ func (buf *TrackedBuffer) astPrintf(currentNode SQLNode, format string, values . default: panic(fmt.Sprintf("unexpected TrackedBuffer type %T", v)) } - case 'v': + case 'l', 'r', 'v': + left := token != 'r' value := values[fieldnum] expr := getExpressionForParensEval(checkParens, value) - if expr != nil { // - needParens := needParens(currentExpr, expr) + if expr == nil { + buf.formatter(value.(SQLNode)) + } else { + needParens := needParens(currentExpr, expr, left) buf.printIf(needParens, "(") buf.formatter(expr) buf.printIf(needParens, ")") - } else { - buf.formatter(value.(SQLNode)) } - case 'a': buf.WriteArg(values[fieldnum].(string)) default: @@ -153,7 +156,16 @@ func (buf *TrackedBuffer) formatter(node SQLNode) { } } -func needParens(op, val Expr) bool { +//needParens says if we need a parenthesis +// op is the operator we are printing +// val is the value we are checking if we need parens around or not +// left let's us know if the value is on the lhs or rhs of the operator +func needParens(op, val Expr, left bool) bool { + // Values are atomic and never need parens + if IsValue(val) { + return false + } + if areBothISExpr(op, val) { return true } @@ -161,7 +173,17 @@ func needParens(op, val Expr) bool { opBinding := precedenceFor(op) valBinding := precedenceFor(val) - return !(opBinding == Syntactic || valBinding == Syntactic) && valBinding > opBinding + if opBinding == Syntactic || valBinding == Syntactic { + return false + } + + if left { + // for left associative operators, if the value is to the left of the operator, + // we only need parens if the order is higher for the value expression + return valBinding > opBinding + } + + return valBinding >= opBinding } func areBothISExpr(op Expr, val Expr) bool { diff --git a/go/vt/vtgate/plugin_mysql_server.go b/go/vt/vtgate/plugin_mysql_server.go index ded421e99b7..260d04b8493 100644 --- a/go/vt/vtgate/plugin_mysql_server.go +++ b/go/vt/vtgate/plugin_mysql_server.go @@ -147,20 +147,25 @@ func startSpanTestable(ctx context.Context, query, label string, newSpanFromString func(context.Context, string, string) (trace.Span, context.Context, error)) (trace.Span, context.Context, error) { _, comments := sqlparser.SplitMarginComments(query) match := r.FindStringSubmatch(comments.Leading) + span, ctx := getSpan(ctx, match, newSpan, label, newSpanFromString) + + trace.AnnotateSQL(span, query) + + return span, ctx, nil +} + +func getSpan(ctx context.Context, match []string, newSpan func(context.Context, string) (trace.Span, context.Context), label string, newSpanFromString func(context.Context, string, string) (trace.Span, context.Context, error)) (trace.Span, context.Context) { var span trace.Span - if len(match) == 0 { - span, ctx = newSpan(ctx, label) - } else { + if len(match) != 0 { var err error span, ctx, err = newSpanFromString(ctx, match[1], label) - if err != nil { - return nil, nil, err + if err == nil { + return span, ctx } + log.Warningf("Unable to parse VT_SPAN_CONTEXT: %s", err.Error()) } - - trace.AnnotateSQL(span, query) - - return span, ctx, nil + span, ctx = newSpan(ctx, label) + return span, ctx } func startSpan(ctx context.Context, query, label string) (trace.Span, context.Context, error) { diff --git a/go/vt/vtgate/plugin_mysql_server_test.go b/go/vt/vtgate/plugin_mysql_server_test.go index 711b1ff861b..eebec610326 100644 --- a/go/vt/vtgate/plugin_mysql_server_test.go +++ b/go/vt/vtgate/plugin_mysql_server_test.go @@ -17,6 +17,7 @@ limitations under the License. package vtgate import ( + "fmt" "io/ioutil" "os" "path" @@ -170,6 +171,12 @@ func newFromStringFail(t *testing.T) func(ctx context.Context, parentSpan string } } +func newFromStringError(t *testing.T) func(ctx context.Context, parentSpan string, label string) (trace.Span, context.Context, error) { + return func(ctx context.Context, parentSpan string, label string) (trace.Span, context.Context, error) { + return trace.NoopSpan{}, context.Background(), fmt.Errorf("") + } +} + func newFromStringExpect(t *testing.T, expected string) func(ctx context.Context, parentSpan string, label string) (trace.Span, context.Context, error) { return func(ctx context.Context, parentSpan string, label string) (trace.Span, context.Context, error) { assert.Equal(t, expected, parentSpan) @@ -206,6 +213,18 @@ func TestSpanContextPassedInEvenAroundOtherComments(t *testing.T) { assert.NoError(t, err) } +func TestSpanContextNotParsable(t *testing.T) { + hasRun := false + _, _, err := startSpanTestable(context.Background(), "/*VT_SPAN_CONTEXT=123*/SQL QUERY", "someLabel", + func(c context.Context, s string) (trace.Span, context.Context) { + hasRun = true + return trace.NoopSpan{}, context.Background() + }, + newFromStringError(t)) + assert.NoError(t, err) + assert.True(t, hasRun, "Should have continued execution despite failure to parse VT_SPAN_CONTEXT") +} + func newTestAuthServerStatic() *mysql.AuthServerStatic { jsonConfig := "{\"user1\":{\"Password\":\"password1\", \"UserData\":\"userData1\", \"SourceHost\":\"localhost\"}}" return mysql.NewAuthServerStatic("", jsonConfig, 0) diff --git a/go/vt/vtgate/safe_session.go b/go/vt/vtgate/safe_session.go index a4de83364ee..ea386da1fc5 100644 --- a/go/vt/vtgate/safe_session.go +++ b/go/vt/vtgate/safe_session.go @@ -214,7 +214,10 @@ func (session *SafeSession) AppendOrUpdate(shardSession *vtgatepb.Session_ShardS session.mu.Lock() defer session.mu.Unlock() - if session.autocommitState == autocommitted { + // additional check of transaction id is required + // as now in autocommit mode there can be session due to reserved connection + // that needs to be stored as shard session. + if session.autocommitState == autocommitted && shardSession.TransactionId != 0 { // Should be unreachable return vterrors.New(vtrpcpb.Code_INTERNAL, "BUG: SafeSession.AppendOrUpdate: unexpected autocommit state") } diff --git a/go/vt/vttablet/tabletmanager/rpc_replication.go b/go/vt/vttablet/tabletmanager/rpc_replication.go index 1db7e7dccde..240f8772232 100644 --- a/go/vt/vttablet/tabletmanager/rpc_replication.go +++ b/go/vt/vttablet/tabletmanager/rpc_replication.go @@ -249,10 +249,6 @@ func (tm *TabletManager) InitMaster(ctx context.Context) (string, error) { // Set the server read-write, from now on we can accept real // client writes. Note that if semi-sync replication is enabled, // we'll still need some replicas to be able to commit transactions. - if err := tm.MysqlDaemon.SetReadOnly(false); err != nil { - return "", err - } - if err := tm.changeTypeLocked(ctx, topodatapb.TabletType_MASTER); err != nil { return "", err } @@ -739,12 +735,6 @@ func (tm *TabletManager) PromoteReplica(ctx context.Context) (string, error) { return "", err } - // We call SetReadOnly only after the topo has been updated to avoid - // situations where two tablets are master at the DB level but not at the vitess level - if err := tm.MysqlDaemon.SetReadOnly(false); err != nil { - return "", err - } - return mysql.EncodePosition(pos), nil } diff --git a/go/vt/vttablet/tabletmanager/tm_state.go b/go/vt/vttablet/tabletmanager/tm_state.go index fad4a8b9048..80a4d5e05b5 100644 --- a/go/vt/vttablet/tabletmanager/tm_state.go +++ b/go/vt/vttablet/tabletmanager/tm_state.go @@ -163,6 +163,11 @@ func (ts *tmState) ChangeTabletType(ctx context.Context, tabletType topodatapb.T if err != nil { return err } + // We call SetReadOnly only after the topo has been updated to avoid + // situations where two tablets are master at the DB level but not at the vitess level + if err := ts.tm.MysqlDaemon.SetReadOnly(false); err != nil { + return err + } ts.tablet.Type = tabletType ts.tablet.MasterTermStartTime = masterTermStartTime diff --git a/go/vt/vttablet/tabletserver/schema/engine.go b/go/vt/vttablet/tabletserver/schema/engine.go index d033c0fac5c..cdbcb635d9a 100644 --- a/go/vt/vttablet/tabletserver/schema/engine.go +++ b/go/vt/vttablet/tabletserver/schema/engine.go @@ -24,6 +24,7 @@ import ( "sync" "time" + "vitess.io/vitess/go/vt/dbconnpool" "vitess.io/vitess/go/vt/vtgate/evalengine" "golang.org/x/net/context" @@ -40,6 +41,7 @@ import ( "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) @@ -71,6 +73,9 @@ type Engine struct { conns *connpool.Pool ticks *timer.Timer + + // dbCreationFailed is for preventing log spam. + dbCreationFailed bool } // NewEngine creates a new Engine. @@ -110,6 +115,49 @@ func (se *Engine) InitDBConfig(cp dbconfigs.Connector) { se.cp = cp } +// EnsureConnectionAndDB ensures that we can connect to mysql. +// If tablet type is master and there is no db, then the database is created. +// This function can be called before opening the Engine. +func (se *Engine) EnsureConnectionAndDB(tabletType topodatapb.TabletType) error { + ctx := tabletenv.LocalContext() + conn, err := dbconnpool.NewDBConnection(ctx, se.env.Config().DB.AppWithDB()) + if err == nil { + conn.Close() + se.dbCreationFailed = false + return nil + } + if tabletType != topodatapb.TabletType_MASTER { + return err + } + if merr, isSQLErr := err.(*mysql.SQLError); !isSQLErr || merr.Num != mysql.ERBadDb { + return err + } + + // We are master and db is not found. Let's create it. + // We use allprivs instead of DBA because we want db create to fail if we're read-only. + conn, err = dbconnpool.NewDBConnection(ctx, se.env.Config().DB.AllPrivsConnector()) + if err != nil { + return vterrors.Wrap(err, "allprivs connection failed") + } + defer conn.Close() + + dbname := se.env.Config().DB.DBName + _, err = conn.ExecuteFetch(fmt.Sprintf("create database if not exists `%s`", dbname), 1, false) + if err != nil { + if !se.dbCreationFailed { + // This is the first failure. + log.Errorf("db creation failed for %v: %v, will keep retrying", dbname, err) + se.dbCreationFailed = true + } + return err + } + + se.dbCreationFailed = false + log.Infof("db %v created", dbname) + se.dbCreationFailed = false + return nil +} + // Open initializes the Engine. Calling Open on an already // open engine is a no-op. func (se *Engine) Open() error { diff --git a/go/vt/vttablet/tabletserver/state_manager.go b/go/vt/vttablet/tabletserver/state_manager.go index 00537d6e70b..9135dc38b8c 100644 --- a/go/vt/vttablet/tabletserver/state_manager.go +++ b/go/vt/vttablet/tabletserver/state_manager.go @@ -123,6 +123,7 @@ type stateManager struct { type ( schemaEngine interface { + EnsureConnectionAndDB(topodatapb.TabletType) error Open() error MakeNonMaster() Close() @@ -395,7 +396,7 @@ func (sm *stateManager) VerifyTarget(ctx context.Context, target *querypb.Target func (sm *stateManager) serveMaster() error { sm.watcher.Close() - if err := sm.connect(); err != nil { + if err := sm.connect(topodatapb.TabletType_MASTER); err != nil { return err } @@ -414,7 +415,7 @@ func (sm *stateManager) unserveMaster() error { sm.watcher.Close() - if err := sm.connect(); err != nil { + if err := sm.connect(topodatapb.TabletType_MASTER); err != nil { return err } @@ -428,7 +429,7 @@ func (sm *stateManager) serveNonMaster(wantTabletType topodatapb.TabletType) err sm.tracker.Close() sm.se.MakeNonMaster() - if err := sm.connect(); err != nil { + if err := sm.connect(wantTabletType); err != nil { return err } @@ -446,7 +447,7 @@ func (sm *stateManager) unserveNonMaster(wantTabletType topodatapb.TabletType) e sm.se.MakeNonMaster() - if err := sm.connect(); err != nil { + if err := sm.connect(wantTabletType); err != nil { return err } @@ -456,8 +457,8 @@ func (sm *stateManager) unserveNonMaster(wantTabletType topodatapb.TabletType) e return nil } -func (sm *stateManager) connect() error { - if err := sm.qe.IsMySQLReachable(); err != nil { +func (sm *stateManager) connect(tabletType topodatapb.TabletType) error { + if err := sm.se.EnsureConnectionAndDB(tabletType); err != nil { return err } if err := sm.se.Open(); err != nil { diff --git a/go/vt/vttablet/tabletserver/state_manager_test.go b/go/vt/vttablet/tabletserver/state_manager_test.go index 438fd347757..319ea21e772 100644 --- a/go/vt/vttablet/tabletserver/state_manager_test.go +++ b/go/vt/vttablet/tabletserver/state_manager_test.go @@ -79,7 +79,7 @@ func TestStateManagerServeMaster(t *testing.T) { verifySubcomponent(t, 9, sm.messager, testStateOpen) assert.False(t, sm.se.(*testSchemaEngine).nonMaster) - assert.True(t, sm.qe.(*testQueryEngine).isReachable) + assert.True(t, sm.se.(*testSchemaEngine).ensureCalled) assert.False(t, sm.qe.(*testQueryEngine).stopServing) assert.Equal(t, topodatapb.TabletType_MASTER, sm.target.TabletType) @@ -286,7 +286,7 @@ func TestStateManagerTransitionFailRetry(t *testing.T) { transitionRetryInterval = 10 * time.Millisecond sm := newTestStateManager(t) - sm.qe.(*testQueryEngine).failMySQL = true + sm.se.(*testSchemaEngine).failMySQL = true err := sm.SetServingType(topodatapb.TabletType_MASTER, testNow, StateServing, "") require.Error(t, err) @@ -613,7 +613,19 @@ func (tos testOrderState) State() testState { type testSchemaEngine struct { testOrderState - nonMaster bool + ensureCalled bool + nonMaster bool + + failMySQL bool +} + +func (te *testSchemaEngine) EnsureConnectionAndDB(tabletType topodatapb.TabletType) error { + if te.failMySQL { + te.failMySQL = false + return errors.New("intentional error") + } + te.ensureCalled = true + return nil } func (te *testSchemaEngine) Open() error { @@ -658,7 +670,6 @@ func (te *testReplTracker) Status() (time.Duration, error) { type testQueryEngine struct { testOrderState - isReachable bool stopServing bool failMySQL bool @@ -675,7 +686,6 @@ func (te *testQueryEngine) IsMySQLReachable() error { te.failMySQL = false return errors.New("intentional error") } - te.isReachable = true return nil } diff --git a/go/vt/vttablet/tabletserver/tabletenv/config.go b/go/vt/vttablet/tabletserver/tabletenv/config.go index f68e8da3c0f..1e195414720 100644 --- a/go/vt/vttablet/tabletserver/tabletenv/config.go +++ b/go/vt/vttablet/tabletserver/tabletenv/config.go @@ -117,7 +117,7 @@ func init() { flag.BoolVar(¤tConfig.TerseErrors, "queryserver-config-terse-errors", defaultConfig.TerseErrors, "prevent bind vars from escaping in returned errors") flag.StringVar(&deprecatedPoolNamePrefix, "pool-name-prefix", "", "Deprecated") flag.BoolVar(¤tConfig.WatchReplication, "watch_replication_stream", false, "When enabled, vttablet will stream the MySQL replication stream from the local server, and use it to update schema when it sees a DDL.") - flag.BoolVar(¤tConfig.TrackSchemaVersions, "track_schema_versions", true, "When enabled, vttablet will store versions of schemas at each position that a DDL is applied and allow retrieval of the schema corresponding to a position") + flag.BoolVar(¤tConfig.TrackSchemaVersions, "track_schema_versions", false, "When enabled, vttablet will store versions of schemas at each position that a DDL is applied and allow retrieval of the schema corresponding to a position") flag.BoolVar(&deprecatedAutocommit, "enable-autocommit", true, "This flag is deprecated. Autocommit is always allowed.") flag.BoolVar(¤tConfig.TwoPCEnable, "twopc_enable", defaultConfig.TwoPCEnable, "if the flag is on, 2pc is enabled. Other 2pc flags must be supplied.") flag.StringVar(¤tConfig.TwoPCCoordinatorAddress, "twopc_coordinator_address", defaultConfig.TwoPCCoordinatorAddress, "address of the (VTGate) process(es) that will be used to notify of abandoned transactions.") diff --git a/go/vt/vttablet/tabletserver/tabletenv/config_test.go b/go/vt/vttablet/tabletserver/tabletenv/config_test.go index bcb5e684a30..082c02c794f 100644 --- a/go/vt/vttablet/tabletserver/tabletenv/config_test.go +++ b/go/vt/vttablet/tabletserver/tabletenv/config_test.go @@ -190,7 +190,7 @@ func TestFlags(t *testing.T) { StreamBufferSize: 32768, QueryCacheSize: 5000, SchemaReloadIntervalSeconds: 1800, - TrackSchemaVersions: true, + TrackSchemaVersions: false, MessagePostponeParallelism: 4, CacheResultFields: true, TxThrottlerConfig: "target_replication_lag_sec: 2\nmax_replication_lag_sec: 10\ninitial_rate: 100\nmax_increase: 1\nemergency_decrease: 0.5\nmin_duration_between_increases_sec: 40\nmax_duration_between_increases_sec: 62\nmin_duration_between_decreases_sec: 20\nspread_backlog_across_sec: 20\nage_bad_rate_after_sec: 180\nbad_rate_increase: 0.1\nmax_rate_approach_threshold: 0.9\n", diff --git a/go/vt/wrangler/vdiff.go b/go/vt/wrangler/vdiff.go index 850799bc572..202f1732062 100644 --- a/go/vt/wrangler/vdiff.go +++ b/go/vt/wrangler/vdiff.go @@ -23,6 +23,8 @@ import ( "sync" "time" + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/vtgate/evalengine" "github.com/golang/protobuf/proto" @@ -358,10 +360,20 @@ func (df *vdiff) buildTablePlan(table *tabletmanagerdatapb.TableDefinition, quer for _, pk := range table.PrimaryKeyColumns { found := false for i, selExpr := range targetSelect.SelectExprs { - colname := selExpr.(*sqlparser.AliasedExpr).Expr.(*sqlparser.ColName).Name.Lowered() - if pk == colname { + expr := selExpr.(*sqlparser.AliasedExpr).Expr + var colname string + switch ct := expr.(type) { + case *sqlparser.ColName: + colname = ct.Name.String() + case *sqlparser.FuncExpr: //eg. weight_string() + colname = ct.Name.String() + default: + log.Warningf("Unhandled type found for column in vdiff: %v(%v)", selExpr, ct) + colname = "" + } + if strings.EqualFold(pk, colname) { td.comparePKs = append(td.comparePKs, td.compareCols[i]) - // We'll be comparing pks seperately. So, remove them from compareCols. + // We'll be comparing pks separately. So, remove them from compareCols. td.compareCols[i] = -1 found = true break