From 91a2751e50e7d491f1f66d571ffc2db1095fc872 Mon Sep 17 00:00:00 2001 From: Sugu Sougoumarane Date: Sat, 25 Jul 2020 13:32:46 -0700 Subject: [PATCH 01/15] vttablet: automatically create db if absent This functionality facilitates the use case where we connect to an externally managed mysql, but whose database has not been created yet. If the DB we look for is not found, we can just create it. Signed-off-by: Sugu Sougoumarane --- go/test/endtoend/tabletmanager/tablet_test.go | 32 +++++++++++++ go/vt/dbconfigs/dbconfigs.go | 5 ++ go/vt/dbconfigs/dbconfigs_test.go | 3 ++ go/vt/vttablet/tabletserver/schema/engine.go | 48 +++++++++++++++++++ go/vt/vttablet/tabletserver/state_manager.go | 13 ++--- .../tabletserver/state_manager_test.go | 20 ++++++-- 6 files changed, 110 insertions(+), 11 deletions(-) diff --git a/go/test/endtoend/tabletmanager/tablet_test.go b/go/test/endtoend/tabletmanager/tablet_test.go index 8277ee13aed..a90b6e313b3 100644 --- a/go/test/endtoend/tabletmanager/tablet_test.go +++ b/go/test/endtoend/tabletmanager/tablet_test.go @@ -20,12 +20,44 @@ import ( "fmt" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/vt/log" ) +// TestEnsureDB tests that vttablet creates the db as needed +func TestEnsureDB(t *testing.T) { + defer cluster.PanicHandler(t) + + // Create new tablet + tablet := clusterInstance.NewVttabletInstance("replica", 0, "") + tablet.MysqlctlProcess = *cluster.MysqlCtlProcessInstance(tablet.TabletUID, tablet.MySQLPort, clusterInstance.TmpDirectory) + err := tablet.MysqlctlProcess.Start() + require.NoError(t, err) + + log.Info(fmt.Sprintf("Started vttablet %v", tablet)) + // Start vttablet process as replica. It won't be able to serve because there's no db. + err = clusterInstance.StartVttablet(tablet, "NOT_SERVING", false, cell, "dbtest", hostname, "0") + require.NoError(t, err) + + // Make it the master. + err = clusterInstance.VtctlclientProcess.ExecuteCommand("TabletExternallyReparented", tablet.Alias) + require.NoError(t, err) + + // It will still fail because the db is read-only. + assert.Equal(t, "NOT_SERVING", tablet.VttabletProcess.GetTabletStatus()) + status := tablet.VttabletProcess.GetStatusDetails() + assert.Contains(t, status, "read-only") + + // Switch to read-write and verify that that we go serving. + _ = clusterInstance.VtctlclientProcess.ExecuteCommand("SetReadWrite", tablet.Alias) + err = tablet.VttabletProcess.WaitForTabletType("SERVING") + require.NoError(t, err) + killTablets(t, tablet) +} + // TestLocalMetadata tests the contents of local_metadata table after vttablet startup func TestLocalMetadata(t *testing.T) { defer cluster.PanicHandler(t) diff --git a/go/vt/dbconfigs/dbconfigs.go b/go/vt/dbconfigs/dbconfigs.go index efabbe9d5e8..2c5d5441531 100644 --- a/go/vt/dbconfigs/dbconfigs.go +++ b/go/vt/dbconfigs/dbconfigs.go @@ -223,6 +223,11 @@ func (dbcfgs *DBConfigs) AppDebugWithDB() Connector { return dbcfgs.makeParams(&dbcfgs.appdebugParams, true) } +// AllPrivsConnector returns connection parameters for appdebug with no dbname set. +func (dbcfgs *DBConfigs) AllPrivsConnector() Connector { + return dbcfgs.makeParams(&dbcfgs.allprivsParams, false) +} + // AllPrivsWithDB returns connection parameters for appdebug with dbname set. func (dbcfgs *DBConfigs) AllPrivsWithDB() Connector { return dbcfgs.makeParams(&dbcfgs.allprivsParams, true) diff --git a/go/vt/dbconfigs/dbconfigs_test.go b/go/vt/dbconfigs/dbconfigs_test.go index 57f5b695b45..04acb3f9c15 100644 --- a/go/vt/dbconfigs/dbconfigs_test.go +++ b/go/vt/dbconfigs/dbconfigs_test.go @@ -227,6 +227,9 @@ func TestAccessors(t *testing.T) { if got, want := dbc.AppWithDB().connParams.DbName, "db"; got != want { t.Errorf("dbc.AppWithDB().DbName: %v, want %v", got, want) } + if got, want := dbc.AllPrivsConnector().connParams.DbName, ""; got != want { + t.Errorf("dbc.AllPrivsWithDB().DbName: %v, want %v", got, want) + } if got, want := dbc.AllPrivsWithDB().connParams.DbName, "db"; got != want { t.Errorf("dbc.AllPrivsWithDB().DbName: %v, want %v", got, want) } diff --git a/go/vt/vttablet/tabletserver/schema/engine.go b/go/vt/vttablet/tabletserver/schema/engine.go index d033c0fac5c..cdbcb635d9a 100644 --- a/go/vt/vttablet/tabletserver/schema/engine.go +++ b/go/vt/vttablet/tabletserver/schema/engine.go @@ -24,6 +24,7 @@ import ( "sync" "time" + "vitess.io/vitess/go/vt/dbconnpool" "vitess.io/vitess/go/vt/vtgate/evalengine" "golang.org/x/net/context" @@ -40,6 +41,7 @@ import ( "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) @@ -71,6 +73,9 @@ type Engine struct { conns *connpool.Pool ticks *timer.Timer + + // dbCreationFailed is for preventing log spam. + dbCreationFailed bool } // NewEngine creates a new Engine. @@ -110,6 +115,49 @@ func (se *Engine) InitDBConfig(cp dbconfigs.Connector) { se.cp = cp } +// EnsureConnectionAndDB ensures that we can connect to mysql. +// If tablet type is master and there is no db, then the database is created. +// This function can be called before opening the Engine. +func (se *Engine) EnsureConnectionAndDB(tabletType topodatapb.TabletType) error { + ctx := tabletenv.LocalContext() + conn, err := dbconnpool.NewDBConnection(ctx, se.env.Config().DB.AppWithDB()) + if err == nil { + conn.Close() + se.dbCreationFailed = false + return nil + } + if tabletType != topodatapb.TabletType_MASTER { + return err + } + if merr, isSQLErr := err.(*mysql.SQLError); !isSQLErr || merr.Num != mysql.ERBadDb { + return err + } + + // We are master and db is not found. Let's create it. + // We use allprivs instead of DBA because we want db create to fail if we're read-only. + conn, err = dbconnpool.NewDBConnection(ctx, se.env.Config().DB.AllPrivsConnector()) + if err != nil { + return vterrors.Wrap(err, "allprivs connection failed") + } + defer conn.Close() + + dbname := se.env.Config().DB.DBName + _, err = conn.ExecuteFetch(fmt.Sprintf("create database if not exists `%s`", dbname), 1, false) + if err != nil { + if !se.dbCreationFailed { + // This is the first failure. + log.Errorf("db creation failed for %v: %v, will keep retrying", dbname, err) + se.dbCreationFailed = true + } + return err + } + + se.dbCreationFailed = false + log.Infof("db %v created", dbname) + se.dbCreationFailed = false + return nil +} + // Open initializes the Engine. Calling Open on an already // open engine is a no-op. func (se *Engine) Open() error { diff --git a/go/vt/vttablet/tabletserver/state_manager.go b/go/vt/vttablet/tabletserver/state_manager.go index 00537d6e70b..9135dc38b8c 100644 --- a/go/vt/vttablet/tabletserver/state_manager.go +++ b/go/vt/vttablet/tabletserver/state_manager.go @@ -123,6 +123,7 @@ type stateManager struct { type ( schemaEngine interface { + EnsureConnectionAndDB(topodatapb.TabletType) error Open() error MakeNonMaster() Close() @@ -395,7 +396,7 @@ func (sm *stateManager) VerifyTarget(ctx context.Context, target *querypb.Target func (sm *stateManager) serveMaster() error { sm.watcher.Close() - if err := sm.connect(); err != nil { + if err := sm.connect(topodatapb.TabletType_MASTER); err != nil { return err } @@ -414,7 +415,7 @@ func (sm *stateManager) unserveMaster() error { sm.watcher.Close() - if err := sm.connect(); err != nil { + if err := sm.connect(topodatapb.TabletType_MASTER); err != nil { return err } @@ -428,7 +429,7 @@ func (sm *stateManager) serveNonMaster(wantTabletType topodatapb.TabletType) err sm.tracker.Close() sm.se.MakeNonMaster() - if err := sm.connect(); err != nil { + if err := sm.connect(wantTabletType); err != nil { return err } @@ -446,7 +447,7 @@ func (sm *stateManager) unserveNonMaster(wantTabletType topodatapb.TabletType) e sm.se.MakeNonMaster() - if err := sm.connect(); err != nil { + if err := sm.connect(wantTabletType); err != nil { return err } @@ -456,8 +457,8 @@ func (sm *stateManager) unserveNonMaster(wantTabletType topodatapb.TabletType) e return nil } -func (sm *stateManager) connect() error { - if err := sm.qe.IsMySQLReachable(); err != nil { +func (sm *stateManager) connect(tabletType topodatapb.TabletType) error { + if err := sm.se.EnsureConnectionAndDB(tabletType); err != nil { return err } if err := sm.se.Open(); err != nil { diff --git a/go/vt/vttablet/tabletserver/state_manager_test.go b/go/vt/vttablet/tabletserver/state_manager_test.go index 438fd347757..319ea21e772 100644 --- a/go/vt/vttablet/tabletserver/state_manager_test.go +++ b/go/vt/vttablet/tabletserver/state_manager_test.go @@ -79,7 +79,7 @@ func TestStateManagerServeMaster(t *testing.T) { verifySubcomponent(t, 9, sm.messager, testStateOpen) assert.False(t, sm.se.(*testSchemaEngine).nonMaster) - assert.True(t, sm.qe.(*testQueryEngine).isReachable) + assert.True(t, sm.se.(*testSchemaEngine).ensureCalled) assert.False(t, sm.qe.(*testQueryEngine).stopServing) assert.Equal(t, topodatapb.TabletType_MASTER, sm.target.TabletType) @@ -286,7 +286,7 @@ func TestStateManagerTransitionFailRetry(t *testing.T) { transitionRetryInterval = 10 * time.Millisecond sm := newTestStateManager(t) - sm.qe.(*testQueryEngine).failMySQL = true + sm.se.(*testSchemaEngine).failMySQL = true err := sm.SetServingType(topodatapb.TabletType_MASTER, testNow, StateServing, "") require.Error(t, err) @@ -613,7 +613,19 @@ func (tos testOrderState) State() testState { type testSchemaEngine struct { testOrderState - nonMaster bool + ensureCalled bool + nonMaster bool + + failMySQL bool +} + +func (te *testSchemaEngine) EnsureConnectionAndDB(tabletType topodatapb.TabletType) error { + if te.failMySQL { + te.failMySQL = false + return errors.New("intentional error") + } + te.ensureCalled = true + return nil } func (te *testSchemaEngine) Open() error { @@ -658,7 +670,6 @@ func (te *testReplTracker) Status() (time.Duration, error) { type testQueryEngine struct { testOrderState - isReachable bool stopServing bool failMySQL bool @@ -675,7 +686,6 @@ func (te *testQueryEngine) IsMySQLReachable() error { te.failMySQL = false return errors.New("intentional error") } - te.isReachable = true return nil } From 0c8c5ac1fee756bb02de700a660ee7b9e48a4514 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Thu, 27 Aug 2020 20:49:25 +0200 Subject: [PATCH 02/15] Handle FuncExpr columns Signed-off-by: Rohit Nayak --- go/vt/wrangler/vdiff.go | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/go/vt/wrangler/vdiff.go b/go/vt/wrangler/vdiff.go index 850799bc572..8055fc37535 100644 --- a/go/vt/wrangler/vdiff.go +++ b/go/vt/wrangler/vdiff.go @@ -23,6 +23,8 @@ import ( "sync" "time" + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/vtgate/evalengine" "github.com/golang/protobuf/proto" @@ -356,12 +358,23 @@ func (df *vdiff) buildTablePlan(table *tabletmanagerdatapb.TableDefinition, quer var orderby sqlparser.OrderBy for _, pk := range table.PrimaryKeyColumns { + log.Infof("vdiff: pk %s", pk) found := false for i, selExpr := range targetSelect.SelectExprs { - colname := selExpr.(*sqlparser.AliasedExpr).Expr.(*sqlparser.ColName).Name.Lowered() - if pk == colname { + expr := selExpr.(*sqlparser.AliasedExpr).Expr + var colname string + switch ct := expr.(type) { + case *sqlparser.ColName: + colname = ct.Name.String() + case *sqlparser.FuncExpr: //eg. weight_string() + colname = ct.Name.String() + default: + log.Warningf("Unhandled type found for column in vdiff: %v(%v)", selExpr, ct) + colname = "" + } + if strings.EqualFold(pk, colname) { td.comparePKs = append(td.comparePKs, td.compareCols[i]) - // We'll be comparing pks seperately. So, remove them from compareCols. + // We'll be comparing pks separately. So, remove them from compareCols. td.compareCols[i] = -1 found = true break From a3359a1d69c7fb5a44abc88666aa4037b3976fce Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Thu, 27 Aug 2020 21:32:59 +0200 Subject: [PATCH 03/15] Add collation to e2e test to table with unicode_loose_md5 Signed-off-by: Rohit Nayak --- go/test/endtoend/vreplication/config.go | 2 +- go/vt/wrangler/vdiff.go | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/go/test/endtoend/vreplication/config.go b/go/test/endtoend/vreplication/config.go index c22a486e39c..d539252d5d0 100644 --- a/go/test/endtoend/vreplication/config.go +++ b/go/test/endtoend/vreplication/config.go @@ -4,7 +4,7 @@ var ( initialProductSchema = ` create table product(pid int, description varbinary(128), primary key(pid)); create table customer(cid int, name varbinary(128), primary key(cid)); -create table merchant(mname varchar(128), category varchar(128), primary key(mname)); +create table merchant(mname varchar(128), category varchar(128), primary key(mname)) DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; create table orders(oid int, cid int, pid int, mname varchar(128), price int, primary key(oid)); create table customer_seq(id int, next_id bigint, cache bigint, primary key(id)) comment 'vitess_sequence'; create table order_seq(id int, next_id bigint, cache bigint, primary key(id)) comment 'vitess_sequence'; diff --git a/go/vt/wrangler/vdiff.go b/go/vt/wrangler/vdiff.go index 8055fc37535..202f1732062 100644 --- a/go/vt/wrangler/vdiff.go +++ b/go/vt/wrangler/vdiff.go @@ -358,7 +358,6 @@ func (df *vdiff) buildTablePlan(table *tabletmanagerdatapb.TableDefinition, quer var orderby sqlparser.OrderBy for _, pk := range table.PrimaryKeyColumns { - log.Infof("vdiff: pk %s", pk) found := false for i, selExpr := range targetSelect.SelectExprs { expr := selExpr.(*sqlparser.AliasedExpr).Expr From 272226ee157e5d2873ed25233b3ef75a9edca836 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Sun, 13 Sep 2020 15:33:54 +0200 Subject: [PATCH 04/15] Turn off schema tracker by default Signed-off-by: Rohit Nayak --- go/vt/vttablet/tabletserver/tabletenv/config.go | 2 +- go/vt/vttablet/tabletserver/tabletenv/config_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/go/vt/vttablet/tabletserver/tabletenv/config.go b/go/vt/vttablet/tabletserver/tabletenv/config.go index f68e8da3c0f..1e195414720 100644 --- a/go/vt/vttablet/tabletserver/tabletenv/config.go +++ b/go/vt/vttablet/tabletserver/tabletenv/config.go @@ -117,7 +117,7 @@ func init() { flag.BoolVar(¤tConfig.TerseErrors, "queryserver-config-terse-errors", defaultConfig.TerseErrors, "prevent bind vars from escaping in returned errors") flag.StringVar(&deprecatedPoolNamePrefix, "pool-name-prefix", "", "Deprecated") flag.BoolVar(¤tConfig.WatchReplication, "watch_replication_stream", false, "When enabled, vttablet will stream the MySQL replication stream from the local server, and use it to update schema when it sees a DDL.") - flag.BoolVar(¤tConfig.TrackSchemaVersions, "track_schema_versions", true, "When enabled, vttablet will store versions of schemas at each position that a DDL is applied and allow retrieval of the schema corresponding to a position") + flag.BoolVar(¤tConfig.TrackSchemaVersions, "track_schema_versions", false, "When enabled, vttablet will store versions of schemas at each position that a DDL is applied and allow retrieval of the schema corresponding to a position") flag.BoolVar(&deprecatedAutocommit, "enable-autocommit", true, "This flag is deprecated. Autocommit is always allowed.") flag.BoolVar(¤tConfig.TwoPCEnable, "twopc_enable", defaultConfig.TwoPCEnable, "if the flag is on, 2pc is enabled. Other 2pc flags must be supplied.") flag.StringVar(¤tConfig.TwoPCCoordinatorAddress, "twopc_coordinator_address", defaultConfig.TwoPCCoordinatorAddress, "address of the (VTGate) process(es) that will be used to notify of abandoned transactions.") diff --git a/go/vt/vttablet/tabletserver/tabletenv/config_test.go b/go/vt/vttablet/tabletserver/tabletenv/config_test.go index bcb5e684a30..082c02c794f 100644 --- a/go/vt/vttablet/tabletserver/tabletenv/config_test.go +++ b/go/vt/vttablet/tabletserver/tabletenv/config_test.go @@ -190,7 +190,7 @@ func TestFlags(t *testing.T) { StreamBufferSize: 32768, QueryCacheSize: 5000, SchemaReloadIntervalSeconds: 1800, - TrackSchemaVersions: true, + TrackSchemaVersions: false, MessagePostponeParallelism: 4, CacheResultFields: true, TxThrottlerConfig: "target_replication_lag_sec: 2\nmax_replication_lag_sec: 10\ninitial_rate: 100\nmax_increase: 1\nemergency_decrease: 0.5\nmin_duration_between_increases_sec: 40\nmax_duration_between_increases_sec: 62\nmin_duration_between_decreases_sec: 20\nspread_backlog_across_sec: 20\nage_bad_rate_after_sec: 180\nbad_rate_increase: 0.1\nmax_rate_approach_threshold: 0.9\n", From 7b075b659261446fc52832127069ee7b4ae48273 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Sat, 19 Sep 2020 10:06:58 +0530 Subject: [PATCH 05/15] end to end test for reserved connection in autocommit mode Signed-off-by: Harshit Gangal --- .../vtgate/setstatement/sysvar_test.go | 30 +++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/go/test/endtoend/vtgate/setstatement/sysvar_test.go b/go/test/endtoend/vtgate/setstatement/sysvar_test.go index 0eccc545ec6..9a7e98bafc3 100644 --- a/go/test/endtoend/vtgate/setstatement/sysvar_test.go +++ b/go/test/endtoend/vtgate/setstatement/sysvar_test.go @@ -192,6 +192,36 @@ func TestSetSystemVariableAndThenSuccessfulTx(t *testing.T) { assertMatches(t, conn, "select @@sql_safe_updates", "[[INT64(1)]]") } +func TestSetSystemVariableAndThenSuccessfulAutocommitDML(t *testing.T) { + vtParams := mysql.ConnParams{ + Host: "localhost", + Port: clusterInstance.VtgateMySQLPort, + } + + conn, err := mysql.Connect(context.Background(), &vtParams) + require.NoError(t, err) + defer conn.Close() + checkedExec(t, conn, `delete from test`) + + checkedExec(t, conn, `set sql_safe_updates = 1`) + + checkedExec(t, conn, `insert into test (id, val1) values (80, null)`) + assertMatches(t, conn, `select id, val1 from test`, `[[INT64(80) NULL]]`) + assertMatches(t, conn, `select @@sql_safe_updates`, `[[INT64(1)]]`) + + checkedExec(t, conn, `update test set val2 = 2 where val1 is null`) + assertMatches(t, conn, `select id, val1, val2 from test`, `[[INT64(80) NULL INT64(2)]]`) + assertMatches(t, conn, `select @@sql_safe_updates`, `[[INT64(1)]]`) + + checkedExec(t, conn, `update test set val1 = 'text' where val1 is null`) + assertMatches(t, conn, `select id, val1, val2 from test`, `[[INT64(80) VARCHAR("text") INT64(2)]]`) + assertMatches(t, conn, `select @@sql_safe_updates`, `[[INT64(1)]]`) + + checkedExec(t, conn, `delete from test where val1 = 'text'`) + assertMatches(t, conn, `select id, val1, val2 from test`, `[]`) + assertMatches(t, conn, `select @@sql_safe_updates`, `[[INT64(1)]]`) +} + func TestStartTxAndSetSystemVariableAndThenSuccessfulCommit(t *testing.T) { vtParams := mysql.ConnParams{ Host: "localhost", From 3b73b84b2d07cb190935410e70edfe6814693bc5 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Mon, 21 Sep 2020 21:21:27 +0530 Subject: [PATCH 06/15] handle reserved connection in autocommit enabled Signed-off-by: Harshit Gangal --- go/test/endtoend/vtgate/setstatement/sysvar_test.go | 4 ++-- go/vt/vtgate/safe_session.go | 5 ++++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/go/test/endtoend/vtgate/setstatement/sysvar_test.go b/go/test/endtoend/vtgate/setstatement/sysvar_test.go index 9a7e98bafc3..424e2f65e3f 100644 --- a/go/test/endtoend/vtgate/setstatement/sysvar_test.go +++ b/go/test/endtoend/vtgate/setstatement/sysvar_test.go @@ -210,11 +210,11 @@ func TestSetSystemVariableAndThenSuccessfulAutocommitDML(t *testing.T) { assertMatches(t, conn, `select @@sql_safe_updates`, `[[INT64(1)]]`) checkedExec(t, conn, `update test set val2 = 2 where val1 is null`) - assertMatches(t, conn, `select id, val1, val2 from test`, `[[INT64(80) NULL INT64(2)]]`) + assertMatches(t, conn, `select id, val1, val2 from test`, `[[INT64(80) NULL INT32(2)]]`) assertMatches(t, conn, `select @@sql_safe_updates`, `[[INT64(1)]]`) checkedExec(t, conn, `update test set val1 = 'text' where val1 is null`) - assertMatches(t, conn, `select id, val1, val2 from test`, `[[INT64(80) VARCHAR("text") INT64(2)]]`) + assertMatches(t, conn, `select id, val1, val2 from test`, `[[INT64(80) VARCHAR("text") INT32(2)]]`) assertMatches(t, conn, `select @@sql_safe_updates`, `[[INT64(1)]]`) checkedExec(t, conn, `delete from test where val1 = 'text'`) diff --git a/go/vt/vtgate/safe_session.go b/go/vt/vtgate/safe_session.go index a4de83364ee..ea386da1fc5 100644 --- a/go/vt/vtgate/safe_session.go +++ b/go/vt/vtgate/safe_session.go @@ -214,7 +214,10 @@ func (session *SafeSession) AppendOrUpdate(shardSession *vtgatepb.Session_ShardS session.mu.Lock() defer session.mu.Unlock() - if session.autocommitState == autocommitted { + // additional check of transaction id is required + // as now in autocommit mode there can be session due to reserved connection + // that needs to be stored as shard session. + if session.autocommitState == autocommitted && shardSession.TransactionId != 0 { // Should be unreachable return vterrors.New(vtrpcpb.Code_INTERNAL, "BUG: SafeSession.AppendOrUpdate: unexpected autocommit state") } From 937dd29983a9d006c4c7089f657b77e133da8432 Mon Sep 17 00:00:00 2001 From: Andres Taylor Date: Mon, 21 Sep 2020 12:31:19 +0200 Subject: [PATCH 07/15] operator precedence must take associativity into consideration Signed-off-by: Andres Taylor --- go/vt/sqlparser/ast.go | 12 ++++----- go/vt/sqlparser/precedence_test.go | 8 ++++++ go/vt/sqlparser/tracked_buffer.go | 42 +++++++++++++++++++++++------- 3 files changed, 46 insertions(+), 16 deletions(-) diff --git a/go/vt/sqlparser/ast.go b/go/vt/sqlparser/ast.go index 60a699d993a..67dc784275f 100644 --- a/go/vt/sqlparser/ast.go +++ b/go/vt/sqlparser/ast.go @@ -1537,17 +1537,17 @@ func (node Exprs) Format(buf *TrackedBuffer) { // Format formats the node. func (node *AndExpr) Format(buf *TrackedBuffer) { - buf.astPrintf(node, "%v and %v", node.Left, node.Right) + buf.astPrintf(node, "%l and %r", node.Left, node.Right) } // Format formats the node. func (node *OrExpr) Format(buf *TrackedBuffer) { - buf.astPrintf(node, "%v or %v", node.Left, node.Right) + buf.astPrintf(node, "%l or %r", node.Left, node.Right) } // Format formats the node. func (node *XorExpr) Format(buf *TrackedBuffer) { - buf.astPrintf(node, "%v xor %v", node.Left, node.Right) + buf.astPrintf(node, "%l xor %r", node.Left, node.Right) } // Format formats the node. @@ -1557,7 +1557,7 @@ func (node *NotExpr) Format(buf *TrackedBuffer) { // Format formats the node. func (node *ComparisonExpr) Format(buf *TrackedBuffer) { - buf.astPrintf(node, "%v %s %v", node.Left, node.Operator, node.Right) + buf.astPrintf(node, "%l %s %r", node.Left, node.Operator, node.Right) if node.Escape != nil { buf.astPrintf(node, " escape %v", node.Escape) } @@ -1565,7 +1565,7 @@ func (node *ComparisonExpr) Format(buf *TrackedBuffer) { // Format formats the node. func (node *RangeCond) Format(buf *TrackedBuffer) { - buf.astPrintf(node, "%v %s %v and %v", node.Left, node.Operator, node.From, node.To) + buf.astPrintf(node, "%v %s %l and %r", node.Left, node.Operator, node.From, node.To) } // Format formats the node. @@ -1635,7 +1635,7 @@ func (node ListArg) Format(buf *TrackedBuffer) { // Format formats the node. func (node *BinaryExpr) Format(buf *TrackedBuffer) { - buf.astPrintf(node, "%v %s %v", node.Left, node.Operator, node.Right) + buf.astPrintf(node, "%l %s %r", node.Left, node.Operator, node.Right) } // Format formats the node. diff --git a/go/vt/sqlparser/precedence_test.go b/go/vt/sqlparser/precedence_test.go index c0676dee9be..7f8c65032a7 100644 --- a/go/vt/sqlparser/precedence_test.go +++ b/go/vt/sqlparser/precedence_test.go @@ -142,6 +142,14 @@ func TestParens(t *testing.T) { {in: "(a | b) between (5) and (7)", expected: "a | b between 5 and 7"}, {in: "(a and b) between (5) and (7)", expected: "(a and b) between 5 and 7"}, {in: "(true is true) is null", expected: "(true is true) is null"}, + {in: "3 * (100 div 3)", expected: "3 * (100 div 3)"}, + {in: "100 div 2 div 2", expected: "100 div 2 div 2"}, + {in: "100 div (2 div 2)", expected: "100 div (2 div 2)"}, + {in: "(100 div 2) div 2", expected: "100 div 2 div 2"}, + {in: "((((((1000))))))", expected: "1000"}, + {in: "100 - (50 + 10)", expected: "100 - (50 + 10)"}, + {in: "100 - 50 + 10", expected: "100 - 50 + 10"}, + {in: "true and (true and true)", expected: "true and (true and true)"}, } for _, tc := range tests { diff --git a/go/vt/sqlparser/tracked_buffer.go b/go/vt/sqlparser/tracked_buffer.go index 8f5fef1f689..a8230360139 100644 --- a/go/vt/sqlparser/tracked_buffer.go +++ b/go/vt/sqlparser/tracked_buffer.go @@ -56,7 +56,9 @@ func (buf *TrackedBuffer) WriteNode(node SQLNode) *TrackedBuffer { // Myprintf mimics fmt.Fprintf(buf, ...), but limited to Node(%v), // Node.Value(%s) and string(%s). It also allows a %a for a value argument, in // which case it adds tracking info for future substitutions. -// It adds parens as needed to follow precedence rules when printing expressions +// It adds parens as needed to follow precedence rules when printing expressions. +// To handle parens correctly for left associative binary operators, +// use %l and %r to tell the TrackedBuffer which value is on the LHS and RHS // // The name must be something other than the usual Printf() to avoid "go vet" // warnings due to our custom format specifiers. @@ -87,7 +89,8 @@ func (buf *TrackedBuffer) astPrintf(currentNode SQLNode, format string, values . break } i++ // '%' - switch format[i] { + token := format[i] + switch token { case 'c': switch v := values[fieldnum].(type) { case byte: @@ -106,19 +109,19 @@ func (buf *TrackedBuffer) astPrintf(currentNode SQLNode, format string, values . default: panic(fmt.Sprintf("unexpected TrackedBuffer type %T", v)) } - case 'v': + case 'l', 'r', 'v': + left := token != 'r' value := values[fieldnum] expr := getExpressionForParensEval(checkParens, value) - if expr != nil { // - needParens := needParens(currentExpr, expr) + if expr == nil { + buf.formatter(value.(SQLNode)) + } else { + needParens := needParens(currentExpr, expr, left) buf.printIf(needParens, "(") buf.formatter(expr) buf.printIf(needParens, ")") - } else { - buf.formatter(value.(SQLNode)) } - case 'a': buf.WriteArg(values[fieldnum].(string)) default: @@ -153,7 +156,16 @@ func (buf *TrackedBuffer) formatter(node SQLNode) { } } -func needParens(op, val Expr) bool { +//needParens says if we need a parenthesis +// op is the operator we are printing +// val is the value we are checking if we need parens around or not +// left let's us know if the value is on the lhs or rhs of the operator +func needParens(op, val Expr, left bool) bool { + // Values are atomic and never need parens + if IsValue(val) { + return false + } + if areBothISExpr(op, val) { return true } @@ -161,7 +173,17 @@ func needParens(op, val Expr) bool { opBinding := precedenceFor(op) valBinding := precedenceFor(val) - return !(opBinding == Syntactic || valBinding == Syntactic) && valBinding > opBinding + if opBinding == Syntactic || valBinding == Syntactic { + return false + } + + if left { + // for left associative operators, if the value is to the left of the operator, + // we only need parens if the order is higher for the value expression + return valBinding > opBinding + } + + return valBinding >= opBinding } func areBothISExpr(op Expr, val Expr) bool { From 9af279d631a512de942e29b8c867285e42fa3c60 Mon Sep 17 00:00:00 2001 From: Andres Taylor Date: Mon, 21 Sep 2020 13:13:23 +0200 Subject: [PATCH 08/15] add more precedence tests Signed-off-by: Andres Taylor --- go/vt/sqlparser/precedence_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/go/vt/sqlparser/precedence_test.go b/go/vt/sqlparser/precedence_test.go index 7f8c65032a7..a74c44f4e12 100644 --- a/go/vt/sqlparser/precedence_test.go +++ b/go/vt/sqlparser/precedence_test.go @@ -150,6 +150,9 @@ func TestParens(t *testing.T) { {in: "100 - (50 + 10)", expected: "100 - (50 + 10)"}, {in: "100 - 50 + 10", expected: "100 - 50 + 10"}, {in: "true and (true and true)", expected: "true and (true and true)"}, + {in: "10 - 2 - 1", expected: "10 - 2 - 1"}, + {in: "(10 - 2) - 1", expected: "10 - 2 - 1"}, + {in: "10 - (2 - 1)", expected: "10 - (2 - 1)"}, } for _, tc := range tests { From e5c2bb1b3224902d394071d17138adade5932d33 Mon Sep 17 00:00:00 2001 From: Andres Taylor Date: Tue, 22 Sep 2020 15:20:35 +0200 Subject: [PATCH 09/15] Merge pull request #6766 from GuptaManan100/warning-for-context Replaced Error with a warning in case parsing of VT_SPAN_CONTEXT fails --- go/vt/vtgate/plugin_mysql_server.go | 23 ++++++++++++++--------- go/vt/vtgate/plugin_mysql_server_test.go | 19 +++++++++++++++++++ 2 files changed, 33 insertions(+), 9 deletions(-) diff --git a/go/vt/vtgate/plugin_mysql_server.go b/go/vt/vtgate/plugin_mysql_server.go index ded421e99b7..260d04b8493 100644 --- a/go/vt/vtgate/plugin_mysql_server.go +++ b/go/vt/vtgate/plugin_mysql_server.go @@ -147,20 +147,25 @@ func startSpanTestable(ctx context.Context, query, label string, newSpanFromString func(context.Context, string, string) (trace.Span, context.Context, error)) (trace.Span, context.Context, error) { _, comments := sqlparser.SplitMarginComments(query) match := r.FindStringSubmatch(comments.Leading) + span, ctx := getSpan(ctx, match, newSpan, label, newSpanFromString) + + trace.AnnotateSQL(span, query) + + return span, ctx, nil +} + +func getSpan(ctx context.Context, match []string, newSpan func(context.Context, string) (trace.Span, context.Context), label string, newSpanFromString func(context.Context, string, string) (trace.Span, context.Context, error)) (trace.Span, context.Context) { var span trace.Span - if len(match) == 0 { - span, ctx = newSpan(ctx, label) - } else { + if len(match) != 0 { var err error span, ctx, err = newSpanFromString(ctx, match[1], label) - if err != nil { - return nil, nil, err + if err == nil { + return span, ctx } + log.Warningf("Unable to parse VT_SPAN_CONTEXT: %s", err.Error()) } - - trace.AnnotateSQL(span, query) - - return span, ctx, nil + span, ctx = newSpan(ctx, label) + return span, ctx } func startSpan(ctx context.Context, query, label string) (trace.Span, context.Context, error) { diff --git a/go/vt/vtgate/plugin_mysql_server_test.go b/go/vt/vtgate/plugin_mysql_server_test.go index 711b1ff861b..eebec610326 100644 --- a/go/vt/vtgate/plugin_mysql_server_test.go +++ b/go/vt/vtgate/plugin_mysql_server_test.go @@ -17,6 +17,7 @@ limitations under the License. package vtgate import ( + "fmt" "io/ioutil" "os" "path" @@ -170,6 +171,12 @@ func newFromStringFail(t *testing.T) func(ctx context.Context, parentSpan string } } +func newFromStringError(t *testing.T) func(ctx context.Context, parentSpan string, label string) (trace.Span, context.Context, error) { + return func(ctx context.Context, parentSpan string, label string) (trace.Span, context.Context, error) { + return trace.NoopSpan{}, context.Background(), fmt.Errorf("") + } +} + func newFromStringExpect(t *testing.T, expected string) func(ctx context.Context, parentSpan string, label string) (trace.Span, context.Context, error) { return func(ctx context.Context, parentSpan string, label string) (trace.Span, context.Context, error) { assert.Equal(t, expected, parentSpan) @@ -206,6 +213,18 @@ func TestSpanContextPassedInEvenAroundOtherComments(t *testing.T) { assert.NoError(t, err) } +func TestSpanContextNotParsable(t *testing.T) { + hasRun := false + _, _, err := startSpanTestable(context.Background(), "/*VT_SPAN_CONTEXT=123*/SQL QUERY", "someLabel", + func(c context.Context, s string) (trace.Span, context.Context) { + hasRun = true + return trace.NoopSpan{}, context.Background() + }, + newFromStringError(t)) + assert.NoError(t, err) + assert.True(t, hasRun, "Should have continued execution despite failure to parse VT_SPAN_CONTEXT") +} + func newTestAuthServerStatic() *mysql.AuthServerStatic { jsonConfig := "{\"user1\":{\"Password\":\"password1\", \"UserData\":\"userData1\", \"SourceHost\":\"localhost\"}}" return mysql.NewAuthServerStatic("", jsonConfig, 0) From 360f85fc8269b25f62299c333375093adc87afce Mon Sep 17 00:00:00 2001 From: Rafael Chacon Date: Tue, 22 Sep 2020 09:26:56 -0400 Subject: [PATCH 10/15] Merge pull request #6762 from planetscale/ds-set-readonly tm: call setReadOnly inside ChangeTabletType --- go/test/endtoend/tabletmanager/tablet_test.go | 27 +++++++++++++++++++ .../vttablet/tabletmanager/rpc_replication.go | 10 ------- go/vt/vttablet/tabletmanager/tm_state.go | 5 ++++ 3 files changed, 32 insertions(+), 10 deletions(-) diff --git a/go/test/endtoend/tabletmanager/tablet_test.go b/go/test/endtoend/tabletmanager/tablet_test.go index 8277ee13aed..851ae498243 100644 --- a/go/test/endtoend/tabletmanager/tablet_test.go +++ b/go/test/endtoend/tabletmanager/tablet_test.go @@ -26,6 +26,33 @@ import ( "vitess.io/vitess/go/vt/log" ) +// TestEnsureDB tests that vttablet creates the db as needed +func TestEnsureDB(t *testing.T) { + defer cluster.PanicHandler(t) + + // Create new tablet + tablet := clusterInstance.NewVttabletInstance("replica", 0, "") + tablet.MysqlctlProcess = *cluster.MysqlCtlProcessInstance(tablet.TabletUID, tablet.MySQLPort, clusterInstance.TmpDirectory) + err := tablet.MysqlctlProcess.Start() + require.NoError(t, err) + + log.Info(fmt.Sprintf("Started vttablet %v", tablet)) + // Start vttablet process as replica. It won't be able to serve because there's no db. + err = clusterInstance.StartVttablet(tablet, "NOT_SERVING", false, cell, "dbtest", hostname, "0") + require.NoError(t, err) + + // Make it the master. + err = clusterInstance.VtctlclientProcess.ExecuteCommand("TabletExternallyReparented", tablet.Alias) + require.NoError(t, err) + + // It goes SERVING because TER calls ChangeTabletType which will also set the database to read-write + assert.Equal(t, "SERVING", tablet.VttabletProcess.GetTabletStatus()) + status := tablet.VttabletProcess.GetStatusDetails() + assert.Contains(t, status, "Serving") + + killTablets(t, tablet) +} + // TestLocalMetadata tests the contents of local_metadata table after vttablet startup func TestLocalMetadata(t *testing.T) { defer cluster.PanicHandler(t) diff --git a/go/vt/vttablet/tabletmanager/rpc_replication.go b/go/vt/vttablet/tabletmanager/rpc_replication.go index 1db7e7dccde..240f8772232 100644 --- a/go/vt/vttablet/tabletmanager/rpc_replication.go +++ b/go/vt/vttablet/tabletmanager/rpc_replication.go @@ -249,10 +249,6 @@ func (tm *TabletManager) InitMaster(ctx context.Context) (string, error) { // Set the server read-write, from now on we can accept real // client writes. Note that if semi-sync replication is enabled, // we'll still need some replicas to be able to commit transactions. - if err := tm.MysqlDaemon.SetReadOnly(false); err != nil { - return "", err - } - if err := tm.changeTypeLocked(ctx, topodatapb.TabletType_MASTER); err != nil { return "", err } @@ -739,12 +735,6 @@ func (tm *TabletManager) PromoteReplica(ctx context.Context) (string, error) { return "", err } - // We call SetReadOnly only after the topo has been updated to avoid - // situations where two tablets are master at the DB level but not at the vitess level - if err := tm.MysqlDaemon.SetReadOnly(false); err != nil { - return "", err - } - return mysql.EncodePosition(pos), nil } diff --git a/go/vt/vttablet/tabletmanager/tm_state.go b/go/vt/vttablet/tabletmanager/tm_state.go index fad4a8b9048..80a4d5e05b5 100644 --- a/go/vt/vttablet/tabletmanager/tm_state.go +++ b/go/vt/vttablet/tabletmanager/tm_state.go @@ -163,6 +163,11 @@ func (ts *tmState) ChangeTabletType(ctx context.Context, tabletType topodatapb.T if err != nil { return err } + // We call SetReadOnly only after the topo has been updated to avoid + // situations where two tablets are master at the DB level but not at the vitess level + if err := ts.tm.MysqlDaemon.SetReadOnly(false); err != nil { + return err + } ts.tablet.Type = tabletType ts.tablet.MasterTermStartTime = masterTermStartTime From daf163754a1856ea67b724bdeb29626f99de6d57 Mon Sep 17 00:00:00 2001 From: Ameet Kotian Date: Thu, 24 Sep 2020 14:37:53 -0700 Subject: [PATCH 11/15] Fix issues with cherry pick merge --- go/test/endtoend/tabletmanager/tablet_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/go/test/endtoend/tabletmanager/tablet_test.go b/go/test/endtoend/tabletmanager/tablet_test.go index 851ae498243..8e6a1ad5a5e 100644 --- a/go/test/endtoend/tabletmanager/tablet_test.go +++ b/go/test/endtoend/tabletmanager/tablet_test.go @@ -20,6 +20,7 @@ import ( "fmt" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "vitess.io/vitess/go/test/endtoend/cluster" From 6b9229e4a13f3e4e82767ad449446a7327b9f305 Mon Sep 17 00:00:00 2001 From: Deepthi Sigireddi Date: Thu, 17 Sep 2020 19:55:19 -0700 Subject: [PATCH 12/15] Merge pull request #6721 from tinyspeck/fixes-long-wait-filter-keyspace-upstream Fixes long wait filter keyspace --- go/vt/discovery/healthcheck.go | 27 ++++++ go/vt/discovery/healthcheck_test.go | 89 +++++++++++++++++++ .../legacy_tablet_stats_cache_wait.go | 6 ++ go/vt/vtgate/discoverygateway.go | 3 +- go/vt/vtgate/discoverygateway_test.go | 78 ++++++++++++++++ go/vt/vttablet/tabletconn/tablet_conn.go | 8 ++ 6 files changed, 210 insertions(+), 1 deletion(-) diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index d0ca06a7062..5a94a372b64 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -612,8 +612,30 @@ func (hc *HealthCheckImpl) WaitForAllServingTablets(ctx context.Context, targets return hc.waitForTablets(ctx, targets, true) } +// FilterTargetsByKeyspaces only returns the targets that are part of the provided keyspaces +func FilterTargetsByKeyspaces(keyspaces []string, targets []*query.Target) []*query.Target { + filteredTargets := make([]*query.Target, 0) + + // Keep them all if there are no keyspaces to watch + if len(KeyspacesToWatch) == 0 { + return append(filteredTargets, targets...) + } + + // Let's remove from the target shards that are not in the keyspaceToWatch list. + for _, target := range targets { + for _, keyspaceToWatch := range keyspaces { + if target.Keyspace == keyspaceToWatch { + filteredTargets = append(filteredTargets, target) + } + } + } + return filteredTargets +} + // waitForTablets is the internal method that polls for tablets. func (hc *HealthCheckImpl) waitForTablets(ctx context.Context, targets []*query.Target, requireServing bool) error { + targets = FilterTargetsByKeyspaces(KeyspacesToWatch, targets) + for { // We nil targets as we find them. allPresent := true @@ -645,6 +667,11 @@ func (hc *HealthCheckImpl) waitForTablets(ctx context.Context, targets []*query. select { case <-ctx.Done(): timer.Stop() + for _, target := range targets { + if target != nil { + log.Infof("couldn't find tablets for target: %v", target) + } + } return ctx.Err() case <-timer.C: } diff --git a/go/vt/discovery/healthcheck_test.go b/go/vt/discovery/healthcheck_test.go index 98bb5c57aea..a4b2483d47a 100644 --- a/go/vt/discovery/healthcheck_test.go +++ b/go/vt/discovery/healthcheck_test.go @@ -44,6 +44,7 @@ import ( "vitess.io/vitess/go/vt/vttablet/queryservice" "vitess.io/vitess/go/vt/vttablet/tabletconn" + "vitess.io/vitess/go/vt/proto/query" querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) @@ -425,6 +426,94 @@ func TestHealthCheckTimeout(t *testing.T) { mustMatch(t, want, result, "Wrong TabletHealth data") } +func TestWaitForAllServingTablets(t *testing.T) { + ts := memorytopo.NewServer("cell") + hc := createTestHc(ts) + defer hc.Close() + tablet := createTestTablet(0, "cell", "a") + tablet.Type = topodatapb.TabletType_REPLICA + targets := []*query.Target{ + { + Keyspace: tablet.Keyspace, + Shard: tablet.Shard, + TabletType: tablet.Type, + }, + } + input := make(chan *querypb.StreamHealthResponse) + createFakeConn(tablet, input) + + // create a channel and subscribe to healthcheck + resultChan := hc.Subscribe() + hc.AddTablet(tablet) + // there will be a first result, get and discard it + <-resultChan + // empty + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + err := hc.WaitForAllServingTablets(ctx, targets) + assert.NotNil(t, err, "error should not be nil") + + shr := &querypb.StreamHealthResponse{ + TabletAlias: tablet.Alias, + Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA}, + Serving: true, + TabletExternallyReparentedTimestamp: 0, + RealtimeStats: &querypb.RealtimeStats{SecondsBehindMaster: 1, CpuUsage: 0.2}, + } + + input <- shr + <-resultChan + // // check it's there + + targets = []*query.Target{ + { + Keyspace: tablet.Keyspace, + Shard: tablet.Shard, + TabletType: tablet.Type, + }, + } + + err = hc.WaitForAllServingTablets(ctx, targets) + assert.Nil(t, err, "error should be nil. Targets are found") + + targets = []*query.Target{ + { + Keyspace: tablet.Keyspace, + Shard: tablet.Shard, + TabletType: tablet.Type, + }, + { + Keyspace: "newkeyspace", + Shard: tablet.Shard, + TabletType: tablet.Type, + }, + } + + err = hc.WaitForAllServingTablets(ctx, targets) + assert.NotNil(t, err, "error should not be nil (there are no tablets on this keyspace") + + targets = []*query.Target{ + { + Keyspace: tablet.Keyspace, + Shard: tablet.Shard, + TabletType: tablet.Type, + }, + { + Keyspace: "newkeyspace", + Shard: tablet.Shard, + TabletType: tablet.Type, + }, + } + + KeyspacesToWatch = []string{tablet.Keyspace} + + err = hc.WaitForAllServingTablets(ctx, targets) + assert.Nil(t, err, "error should be nil. Keyspace with no tablets is filtered") + + KeyspacesToWatch = []string{} +} + // TestGetHealthyTablets tests the functionality of GetHealthyTabletStats. func TestGetHealthyTablets(t *testing.T) { ts := memorytopo.NewServer("cell") diff --git a/go/vt/discovery/legacy_tablet_stats_cache_wait.go b/go/vt/discovery/legacy_tablet_stats_cache_wait.go index d984ff7d1ce..976aa9e7760 100644 --- a/go/vt/discovery/legacy_tablet_stats_cache_wait.go +++ b/go/vt/discovery/legacy_tablet_stats_cache_wait.go @@ -21,6 +21,7 @@ import ( "golang.org/x/net/context" + "vitess.io/vitess/go/vt/log" querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) @@ -84,6 +85,11 @@ func (tc *LegacyTabletStatsCache) waitForTablets(ctx context.Context, targets [] timer := time.NewTimer(waitAvailableTabletInterval) select { case <-ctx.Done(): + for _, target := range targets { + if target != nil { + log.Infof("couldn't find tablets for target: %v", target) + } + } timer.Stop() return ctx.Err() case <-timer.C: diff --git a/go/vt/vtgate/discoverygateway.go b/go/vt/vtgate/discoverygateway.go index fbcb0becaa5..11a47bdbfda 100644 --- a/go/vt/vtgate/discoverygateway.go +++ b/go/vt/vtgate/discoverygateway.go @@ -214,7 +214,8 @@ func (dg *DiscoveryGateway) WaitForTablets(ctx context.Context, tabletTypesToWai return err } - return dg.tsc.WaitForAllServingTablets(ctx, targets) + filteredTargets := discovery.FilterTargetsByKeyspaces(discovery.KeyspacesToWatch, targets) + return dg.tsc.WaitForAllServingTablets(ctx, filteredTargets) } // Close shuts down underlying connections. diff --git a/go/vt/vtgate/discoverygateway_test.go b/go/vt/vtgate/discoverygateway_test.go index 95772daf592..dc09081baaf 100644 --- a/go/vt/vtgate/discoverygateway_test.go +++ b/go/vt/vtgate/discoverygateway_test.go @@ -20,6 +20,7 @@ import ( "fmt" "strings" "testing" + "time" "vitess.io/vitess/go/vt/log" @@ -34,6 +35,7 @@ import ( "vitess.io/vitess/go/vt/vterrors" querypb "vitess.io/vitess/go/vt/proto/query" + "vitess.io/vitess/go/vt/proto/topodata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) @@ -134,6 +136,82 @@ func TestDiscoveryGatewayGetTablets(t *testing.T) { } } +func TestDiscoveryGatewayWaitForTablets(t *testing.T) { + keyspace := "ks" + shard := "0" + cell := "local" + hc := discovery.NewFakeLegacyHealthCheck() + ts := memorytopo.NewServer("local") + srvTopo := srvtopotest.NewPassthroughSrvTopoServer() + srvTopo.TopoServer = ts + srvTopo.SrvKeyspaceNames = []string{keyspace} + srvTopo.SrvKeyspace = &topodatapb.SrvKeyspace{ + Partitions: []*topodatapb.SrvKeyspace_KeyspacePartition{ + { + ServedType: topodata.TabletType_MASTER, + ShardReferences: []*topodatapb.ShardReference{ + { + Name: shard, + }, + }, + }, + { + ServedType: topodata.TabletType_REPLICA, + ShardReferences: []*topodatapb.ShardReference{ + { + Name: shard, + }, + }, + }, + { + ServedType: topodata.TabletType_RDONLY, + ShardReferences: []*topodatapb.ShardReference{ + { + Name: shard, + }, + }, + }, + }, + } + + dg := NewDiscoveryGateway(context.Background(), hc, srvTopo, "local", 2) + + // replica should only use local ones + hc.Reset() + dg.tsc.ResetForTesting() + hc.AddTestTablet(cell, "2.2.2.2", 1001, keyspace, shard, topodatapb.TabletType_REPLICA, true, 10, nil) + hc.AddTestTablet(cell, "1.1.1.1", 1001, keyspace, shard, topodatapb.TabletType_MASTER, true, 5, nil) + ctx, _ := context.WithTimeout(context.Background(), 1*time.Second) + err := dg.WaitForTablets(ctx, []topodatapb.TabletType{topodatapb.TabletType_REPLICA, topodatapb.TabletType_MASTER}) + if err != nil { + t.Errorf("want %+v, got %+v", nil, err) + } + + // fails if there are no available tablets for the desired TabletType + err = dg.WaitForTablets(ctx, []topodatapb.TabletType{topodatapb.TabletType_RDONLY}) + if err == nil { + t.Errorf("expected error, got nil") + } + + // errors because there is no primary on ks2 + ctx, _ = context.WithTimeout(context.Background(), 1*time.Second) + srvTopo.SrvKeyspaceNames = []string{keyspace, "ks2"} + err = dg.WaitForTablets(ctx, []topodatapb.TabletType{topodatapb.TabletType_MASTER}) + if err == nil { + t.Errorf("expected error, got nil") + } + + discovery.KeyspacesToWatch = []string{keyspace} + // does not wait for ks2 if it's not part of the filter + ctx, _ = context.WithTimeout(context.Background(), 1*time.Second) + err = dg.WaitForTablets(ctx, []topodatapb.TabletType{topodatapb.TabletType_MASTER}) + if err != nil { + t.Errorf("want %+v, got %+v", nil, err) + } + + discovery.KeyspacesToWatch = []string{} +} + func TestShuffleTablets(t *testing.T) { ts1 := discovery.LegacyTabletStats{ Key: "t1", diff --git a/go/vt/vttablet/tabletconn/tablet_conn.go b/go/vt/vttablet/tabletconn/tablet_conn.go index 2580c34fad0..97a6ea2ee1b 100644 --- a/go/vt/vttablet/tabletconn/tablet_conn.go +++ b/go/vt/vttablet/tabletconn/tablet_conn.go @@ -18,6 +18,7 @@ package tabletconn import ( "flag" + "sync" "vitess.io/vitess/go/vt/grpcclient" "vitess.io/vitess/go/vt/log" @@ -50,9 +51,14 @@ type TabletDialer func(tablet *topodatapb.Tablet, failFast grpcclient.FailFast) var dialers = make(map[string]TabletDialer) +// mu This mutex helps us prevent data races when registering / getting dialers +var mu sync.Mutex + // RegisterDialer is meant to be used by TabletDialer implementations // to self register. func RegisterDialer(name string, dialer TabletDialer) { + mu.Lock() + defer mu.Unlock() if _, ok := dialers[name]; ok { log.Fatalf("Dialer %s already exists", name) } @@ -61,6 +67,8 @@ func RegisterDialer(name string, dialer TabletDialer) { // GetDialer returns the dialer to use, described by the command line flag func GetDialer() TabletDialer { + mu.Lock() + defer mu.Unlock() td, ok := dialers[*TabletProtocol] if !ok { log.Exitf("No dialer registered for tablet protocol %s", *TabletProtocol) From c1842d86c44697909579186af9369c6f2006108b Mon Sep 17 00:00:00 2001 From: deepthi Date: Thu, 24 Sep 2020 15:42:25 -0700 Subject: [PATCH 13/15] tm: change how SetReadOnly is called to avoid errors from externally managed tablets Signed-off-by: deepthi --- go/test/endtoend/tabletmanager/tablet_test.go | 10 +++++++--- go/vt/vttablet/tabletmanager/restore.go | 6 +++--- go/vt/vttablet/tabletmanager/rpc_actions.go | 16 +++++++++++++--- go/vt/vttablet/tabletmanager/rpc_backup.go | 4 ++-- go/vt/vttablet/tabletmanager/rpc_replication.go | 10 +++++----- go/vt/vttablet/tabletmanager/tm_state.go | 12 +++++++----- go/vt/vttablet/tabletmanager/tm_state_test.go | 4 ++-- 7 files changed, 39 insertions(+), 23 deletions(-) diff --git a/go/test/endtoend/tabletmanager/tablet_test.go b/go/test/endtoend/tabletmanager/tablet_test.go index 8e6a1ad5a5e..0ce8cd94276 100644 --- a/go/test/endtoend/tabletmanager/tablet_test.go +++ b/go/test/endtoend/tabletmanager/tablet_test.go @@ -46,11 +46,15 @@ func TestEnsureDB(t *testing.T) { err = clusterInstance.VtctlclientProcess.ExecuteCommand("TabletExternallyReparented", tablet.Alias) require.NoError(t, err) - // It goes SERVING because TER calls ChangeTabletType which will also set the database to read-write - assert.Equal(t, "SERVING", tablet.VttabletProcess.GetTabletStatus()) + // It is still NOT_SERVING because the db is read-only. + assert.Equal(t, "NOT_SERVING", tablet.VttabletProcess.GetTabletStatus()) status := tablet.VttabletProcess.GetStatusDetails() - assert.Contains(t, status, "Serving") + assert.Contains(t, status, "read-only") + // Switch to read-write and verify that that we go serving. + _ = clusterInstance.VtctlclientProcess.ExecuteCommand("SetReadWrite", tablet.Alias) + err = tablet.VttabletProcess.WaitForTabletType("SERVING") + require.NoError(t, err) killTablets(t, tablet) } diff --git a/go/vt/vttablet/tabletmanager/restore.go b/go/vt/vttablet/tabletmanager/restore.go index 078b49674dc..20bc4524956 100644 --- a/go/vt/vttablet/tabletmanager/restore.go +++ b/go/vt/vttablet/tabletmanager/restore.go @@ -129,7 +129,7 @@ func (tm *TabletManager) restoreDataLocked(ctx context.Context, logger logutil.L if originalType == topodatapb.TabletType_MASTER { originalType = tm.baseTabletType } - if err := tm.tmState.ChangeTabletType(ctx, topodatapb.TabletType_RESTORE); err != nil { + if err := tm.tmState.ChangeTabletType(ctx, topodatapb.TabletType_RESTORE, DBActionNone); err != nil { return err } // Loop until a backup exists, unless we were told to give up immediately. @@ -178,7 +178,7 @@ func (tm *TabletManager) restoreDataLocked(ctx context.Context, logger logutil.L // No-op, starting with empty database. default: // If anything failed, we should reset the original tablet type - if err := tm.tmState.ChangeTabletType(ctx, originalType); err != nil { + if err := tm.tmState.ChangeTabletType(ctx, originalType, DBActionNone); err != nil { log.Errorf("Could not change back to original tablet type %v: %v", originalType, err) } return vterrors.Wrap(err, "Can't restore backup") @@ -194,7 +194,7 @@ func (tm *TabletManager) restoreDataLocked(ctx context.Context, logger logutil.L } // Change type back to original type if we're ok to serve. - return tm.tmState.ChangeTabletType(ctx, originalType) + return tm.tmState.ChangeTabletType(ctx, originalType, DBActionNone) } // restoreToTimeFromBinlog restores to the snapshot time of the keyspace diff --git a/go/vt/vttablet/tabletmanager/rpc_actions.go b/go/vt/vttablet/tabletmanager/rpc_actions.go index 0992d63b4cf..f6eba93f651 100644 --- a/go/vt/vttablet/tabletmanager/rpc_actions.go +++ b/go/vt/vttablet/tabletmanager/rpc_actions.go @@ -33,6 +33,16 @@ import ( vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) +// DBAction is used to tell ChangeTabletType whether to call SetReadOnly on change to +// MASTER tablet type +type DBAction int + +// Allowed values for DBAction +const ( + DBActionNone = DBAction(iota) + DBActionSetReadWrite +) + // This file contains the implementations of RPCTM methods. // Major groups of methods are broken out into files named "rpc_*.go". @@ -62,11 +72,11 @@ func (tm *TabletManager) ChangeType(ctx context.Context, tabletType topodatapb.T return err } defer tm.unlock() - return tm.changeTypeLocked(ctx, tabletType) + return tm.changeTypeLocked(ctx, tabletType, DBActionNone) } // ChangeType changes the tablet type -func (tm *TabletManager) changeTypeLocked(ctx context.Context, tabletType topodatapb.TabletType) error { +func (tm *TabletManager) changeTypeLocked(ctx context.Context, tabletType topodatapb.TabletType, action DBAction) error { // We don't want to allow multiple callers to claim a tablet as drained. There is a race that could happen during // horizontal resharding where two vtworkers will try to DRAIN the same tablet. This check prevents that race from // causing errors. @@ -74,7 +84,7 @@ func (tm *TabletManager) changeTypeLocked(ctx context.Context, tabletType topoda return fmt.Errorf("Tablet: %v, is already drained", tm.tabletAlias) } - if err := tm.tmState.ChangeTabletType(ctx, tabletType); err != nil { + if err := tm.tmState.ChangeTabletType(ctx, tabletType, action); err != nil { return err } diff --git a/go/vt/vttablet/tabletmanager/rpc_backup.go b/go/vt/vttablet/tabletmanager/rpc_backup.go index cbb5f179f61..b5666672402 100644 --- a/go/vt/vttablet/tabletmanager/rpc_backup.go +++ b/go/vt/vttablet/tabletmanager/rpc_backup.go @@ -84,7 +84,7 @@ func (tm *TabletManager) Backup(ctx context.Context, concurrency int, logger log } originalType = tablet.Type // update our type to BACKUP - if err := tm.changeTypeLocked(ctx, topodatapb.TabletType_BACKUP); err != nil { + if err := tm.changeTypeLocked(ctx, topodatapb.TabletType_BACKUP, DBActionNone); err != nil { return err } } @@ -115,7 +115,7 @@ func (tm *TabletManager) Backup(ctx context.Context, concurrency int, logger log // Change our type back to the original value. // Original type could be master so pass in a real value for masterTermStartTime - if err := tm.changeTypeLocked(bgCtx, originalType); err != nil { + if err := tm.changeTypeLocked(bgCtx, originalType, DBActionNone); err != nil { // failure in changing the topology type is probably worse, // so returning that (we logged the snapshot error anyway) if returnErr != nil { diff --git a/go/vt/vttablet/tabletmanager/rpc_replication.go b/go/vt/vttablet/tabletmanager/rpc_replication.go index 240f8772232..40e7a499995 100644 --- a/go/vt/vttablet/tabletmanager/rpc_replication.go +++ b/go/vt/vttablet/tabletmanager/rpc_replication.go @@ -249,7 +249,7 @@ func (tm *TabletManager) InitMaster(ctx context.Context) (string, error) { // Set the server read-write, from now on we can accept real // client writes. Note that if semi-sync replication is enabled, // we'll still need some replicas to be able to commit transactions. - if err := tm.changeTypeLocked(ctx, topodatapb.TabletType_MASTER); err != nil { + if err := tm.changeTypeLocked(ctx, topodatapb.TabletType_MASTER, DBActionSetReadWrite); err != nil { return "", err } return mysql.EncodePosition(pos), nil @@ -279,7 +279,7 @@ func (tm *TabletManager) InitReplica(ctx context.Context, parent *topodatapb.Tab // is used on the old master when using InitShardMaster with // -force, and the new master is different from the old master. if tm.Tablet().Type == topodatapb.TabletType_MASTER { - if err := tm.changeTypeLocked(ctx, topodatapb.TabletType_REPLICA); err != nil { + if err := tm.changeTypeLocked(ctx, topodatapb.TabletType_REPLICA, DBActionNone); err != nil { return err } } @@ -518,7 +518,7 @@ func (tm *TabletManager) setMasterLocked(ctx context.Context, parentAlias *topod // unintentionally change the type of RDONLY tablets tablet := tm.Tablet() if tablet.Type == topodatapb.TabletType_MASTER { - if err := tm.tmState.ChangeTabletType(ctx, topodatapb.TabletType_REPLICA); err != nil { + if err := tm.tmState.ChangeTabletType(ctx, topodatapb.TabletType_REPLICA, DBActionNone); err != nil { return err } } @@ -620,7 +620,7 @@ func (tm *TabletManager) ReplicaWasRestarted(ctx context.Context, parent *topoda if tablet.Type != topodatapb.TabletType_MASTER { return nil } - return tm.tmState.ChangeTabletType(ctx, topodatapb.TabletType_REPLICA) + return tm.tmState.ChangeTabletType(ctx, topodatapb.TabletType_REPLICA, DBActionNone) } // StopReplicationAndGetStatus stops MySQL replication, and returns the @@ -731,7 +731,7 @@ func (tm *TabletManager) PromoteReplica(ctx context.Context) (string, error) { return "", err } - if err := tm.changeTypeLocked(ctx, topodatapb.TabletType_MASTER); err != nil { + if err := tm.changeTypeLocked(ctx, topodatapb.TabletType_MASTER, DBActionSetReadWrite); err != nil { return "", err } diff --git a/go/vt/vttablet/tabletmanager/tm_state.go b/go/vt/vttablet/tabletmanager/tm_state.go index 80a4d5e05b5..68ba9235c49 100644 --- a/go/vt/vttablet/tabletmanager/tm_state.go +++ b/go/vt/vttablet/tabletmanager/tm_state.go @@ -150,7 +150,7 @@ func (ts *tmState) RefreshFromTopoInfo(ctx context.Context, shardInfo *topo.Shar ts.updateLocked(ctx) } -func (ts *tmState) ChangeTabletType(ctx context.Context, tabletType topodatapb.TabletType) error { +func (ts *tmState) ChangeTabletType(ctx context.Context, tabletType topodatapb.TabletType, action DBAction) error { ts.mu.Lock() defer ts.mu.Unlock() log.Infof("Changing Tablet Type: %v", tabletType) @@ -163,10 +163,12 @@ func (ts *tmState) ChangeTabletType(ctx context.Context, tabletType topodatapb.T if err != nil { return err } - // We call SetReadOnly only after the topo has been updated to avoid - // situations where two tablets are master at the DB level but not at the vitess level - if err := ts.tm.MysqlDaemon.SetReadOnly(false); err != nil { - return err + if action == DBActionSetReadWrite { + // We call SetReadOnly only after the topo has been updated to avoid + // situations where two tablets are master at the DB level but not at the vitess level + if err := ts.tm.MysqlDaemon.SetReadOnly(false); err != nil { + return err + } } ts.tablet.Type = tabletType diff --git a/go/vt/vttablet/tabletmanager/tm_state_test.go b/go/vt/vttablet/tabletmanager/tm_state_test.go index 680fa41e08d..4de8c87d557 100644 --- a/go/vt/vttablet/tabletmanager/tm_state_test.go +++ b/go/vt/vttablet/tabletmanager/tm_state_test.go @@ -176,14 +176,14 @@ func TestStateChangeTabletType(t *testing.T) { Uid: 2, } - err := tm.tmState.ChangeTabletType(ctx, topodatapb.TabletType_MASTER) + err := tm.tmState.ChangeTabletType(ctx, topodatapb.TabletType_MASTER, DBActionSetReadWrite) require.NoError(t, err) ti, err := ts.GetTablet(ctx, alias) require.NoError(t, err) assert.Equal(t, topodatapb.TabletType_MASTER, ti.Type) assert.NotNil(t, ti.MasterTermStartTime) - err = tm.tmState.ChangeTabletType(ctx, topodatapb.TabletType_REPLICA) + err = tm.tmState.ChangeTabletType(ctx, topodatapb.TabletType_REPLICA, DBActionNone) require.NoError(t, err) ti, err = ts.GetTablet(ctx, alias) require.NoError(t, err) From fea82c19e6012d4ef69c4ec016144b72d24769b6 Mon Sep 17 00:00:00 2001 From: Rafael Chacon Date: Thu, 1 Oct 2020 09:44:44 -0400 Subject: [PATCH 14/15] Revert "tm: change how SetReadOnly is called to avoid errors from externally managed tablets" This reverts commit c1842d86c44697909579186af9369c6f2006108b. --- go/test/endtoend/tabletmanager/tablet_test.go | 10 +++------- go/vt/vttablet/tabletmanager/restore.go | 6 +++--- go/vt/vttablet/tabletmanager/rpc_actions.go | 16 +++------------- go/vt/vttablet/tabletmanager/rpc_backup.go | 4 ++-- go/vt/vttablet/tabletmanager/rpc_replication.go | 10 +++++----- go/vt/vttablet/tabletmanager/tm_state.go | 12 +++++------- go/vt/vttablet/tabletmanager/tm_state_test.go | 4 ++-- 7 files changed, 23 insertions(+), 39 deletions(-) diff --git a/go/test/endtoend/tabletmanager/tablet_test.go b/go/test/endtoend/tabletmanager/tablet_test.go index 0ce8cd94276..8e6a1ad5a5e 100644 --- a/go/test/endtoend/tabletmanager/tablet_test.go +++ b/go/test/endtoend/tabletmanager/tablet_test.go @@ -46,15 +46,11 @@ func TestEnsureDB(t *testing.T) { err = clusterInstance.VtctlclientProcess.ExecuteCommand("TabletExternallyReparented", tablet.Alias) require.NoError(t, err) - // It is still NOT_SERVING because the db is read-only. - assert.Equal(t, "NOT_SERVING", tablet.VttabletProcess.GetTabletStatus()) + // It goes SERVING because TER calls ChangeTabletType which will also set the database to read-write + assert.Equal(t, "SERVING", tablet.VttabletProcess.GetTabletStatus()) status := tablet.VttabletProcess.GetStatusDetails() - assert.Contains(t, status, "read-only") + assert.Contains(t, status, "Serving") - // Switch to read-write and verify that that we go serving. - _ = clusterInstance.VtctlclientProcess.ExecuteCommand("SetReadWrite", tablet.Alias) - err = tablet.VttabletProcess.WaitForTabletType("SERVING") - require.NoError(t, err) killTablets(t, tablet) } diff --git a/go/vt/vttablet/tabletmanager/restore.go b/go/vt/vttablet/tabletmanager/restore.go index 20bc4524956..078b49674dc 100644 --- a/go/vt/vttablet/tabletmanager/restore.go +++ b/go/vt/vttablet/tabletmanager/restore.go @@ -129,7 +129,7 @@ func (tm *TabletManager) restoreDataLocked(ctx context.Context, logger logutil.L if originalType == topodatapb.TabletType_MASTER { originalType = tm.baseTabletType } - if err := tm.tmState.ChangeTabletType(ctx, topodatapb.TabletType_RESTORE, DBActionNone); err != nil { + if err := tm.tmState.ChangeTabletType(ctx, topodatapb.TabletType_RESTORE); err != nil { return err } // Loop until a backup exists, unless we were told to give up immediately. @@ -178,7 +178,7 @@ func (tm *TabletManager) restoreDataLocked(ctx context.Context, logger logutil.L // No-op, starting with empty database. default: // If anything failed, we should reset the original tablet type - if err := tm.tmState.ChangeTabletType(ctx, originalType, DBActionNone); err != nil { + if err := tm.tmState.ChangeTabletType(ctx, originalType); err != nil { log.Errorf("Could not change back to original tablet type %v: %v", originalType, err) } return vterrors.Wrap(err, "Can't restore backup") @@ -194,7 +194,7 @@ func (tm *TabletManager) restoreDataLocked(ctx context.Context, logger logutil.L } // Change type back to original type if we're ok to serve. - return tm.tmState.ChangeTabletType(ctx, originalType, DBActionNone) + return tm.tmState.ChangeTabletType(ctx, originalType) } // restoreToTimeFromBinlog restores to the snapshot time of the keyspace diff --git a/go/vt/vttablet/tabletmanager/rpc_actions.go b/go/vt/vttablet/tabletmanager/rpc_actions.go index f6eba93f651..0992d63b4cf 100644 --- a/go/vt/vttablet/tabletmanager/rpc_actions.go +++ b/go/vt/vttablet/tabletmanager/rpc_actions.go @@ -33,16 +33,6 @@ import ( vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) -// DBAction is used to tell ChangeTabletType whether to call SetReadOnly on change to -// MASTER tablet type -type DBAction int - -// Allowed values for DBAction -const ( - DBActionNone = DBAction(iota) - DBActionSetReadWrite -) - // This file contains the implementations of RPCTM methods. // Major groups of methods are broken out into files named "rpc_*.go". @@ -72,11 +62,11 @@ func (tm *TabletManager) ChangeType(ctx context.Context, tabletType topodatapb.T return err } defer tm.unlock() - return tm.changeTypeLocked(ctx, tabletType, DBActionNone) + return tm.changeTypeLocked(ctx, tabletType) } // ChangeType changes the tablet type -func (tm *TabletManager) changeTypeLocked(ctx context.Context, tabletType topodatapb.TabletType, action DBAction) error { +func (tm *TabletManager) changeTypeLocked(ctx context.Context, tabletType topodatapb.TabletType) error { // We don't want to allow multiple callers to claim a tablet as drained. There is a race that could happen during // horizontal resharding where two vtworkers will try to DRAIN the same tablet. This check prevents that race from // causing errors. @@ -84,7 +74,7 @@ func (tm *TabletManager) changeTypeLocked(ctx context.Context, tabletType topoda return fmt.Errorf("Tablet: %v, is already drained", tm.tabletAlias) } - if err := tm.tmState.ChangeTabletType(ctx, tabletType, action); err != nil { + if err := tm.tmState.ChangeTabletType(ctx, tabletType); err != nil { return err } diff --git a/go/vt/vttablet/tabletmanager/rpc_backup.go b/go/vt/vttablet/tabletmanager/rpc_backup.go index b5666672402..cbb5f179f61 100644 --- a/go/vt/vttablet/tabletmanager/rpc_backup.go +++ b/go/vt/vttablet/tabletmanager/rpc_backup.go @@ -84,7 +84,7 @@ func (tm *TabletManager) Backup(ctx context.Context, concurrency int, logger log } originalType = tablet.Type // update our type to BACKUP - if err := tm.changeTypeLocked(ctx, topodatapb.TabletType_BACKUP, DBActionNone); err != nil { + if err := tm.changeTypeLocked(ctx, topodatapb.TabletType_BACKUP); err != nil { return err } } @@ -115,7 +115,7 @@ func (tm *TabletManager) Backup(ctx context.Context, concurrency int, logger log // Change our type back to the original value. // Original type could be master so pass in a real value for masterTermStartTime - if err := tm.changeTypeLocked(bgCtx, originalType, DBActionNone); err != nil { + if err := tm.changeTypeLocked(bgCtx, originalType); err != nil { // failure in changing the topology type is probably worse, // so returning that (we logged the snapshot error anyway) if returnErr != nil { diff --git a/go/vt/vttablet/tabletmanager/rpc_replication.go b/go/vt/vttablet/tabletmanager/rpc_replication.go index 40e7a499995..240f8772232 100644 --- a/go/vt/vttablet/tabletmanager/rpc_replication.go +++ b/go/vt/vttablet/tabletmanager/rpc_replication.go @@ -249,7 +249,7 @@ func (tm *TabletManager) InitMaster(ctx context.Context) (string, error) { // Set the server read-write, from now on we can accept real // client writes. Note that if semi-sync replication is enabled, // we'll still need some replicas to be able to commit transactions. - if err := tm.changeTypeLocked(ctx, topodatapb.TabletType_MASTER, DBActionSetReadWrite); err != nil { + if err := tm.changeTypeLocked(ctx, topodatapb.TabletType_MASTER); err != nil { return "", err } return mysql.EncodePosition(pos), nil @@ -279,7 +279,7 @@ func (tm *TabletManager) InitReplica(ctx context.Context, parent *topodatapb.Tab // is used on the old master when using InitShardMaster with // -force, and the new master is different from the old master. if tm.Tablet().Type == topodatapb.TabletType_MASTER { - if err := tm.changeTypeLocked(ctx, topodatapb.TabletType_REPLICA, DBActionNone); err != nil { + if err := tm.changeTypeLocked(ctx, topodatapb.TabletType_REPLICA); err != nil { return err } } @@ -518,7 +518,7 @@ func (tm *TabletManager) setMasterLocked(ctx context.Context, parentAlias *topod // unintentionally change the type of RDONLY tablets tablet := tm.Tablet() if tablet.Type == topodatapb.TabletType_MASTER { - if err := tm.tmState.ChangeTabletType(ctx, topodatapb.TabletType_REPLICA, DBActionNone); err != nil { + if err := tm.tmState.ChangeTabletType(ctx, topodatapb.TabletType_REPLICA); err != nil { return err } } @@ -620,7 +620,7 @@ func (tm *TabletManager) ReplicaWasRestarted(ctx context.Context, parent *topoda if tablet.Type != topodatapb.TabletType_MASTER { return nil } - return tm.tmState.ChangeTabletType(ctx, topodatapb.TabletType_REPLICA, DBActionNone) + return tm.tmState.ChangeTabletType(ctx, topodatapb.TabletType_REPLICA) } // StopReplicationAndGetStatus stops MySQL replication, and returns the @@ -731,7 +731,7 @@ func (tm *TabletManager) PromoteReplica(ctx context.Context) (string, error) { return "", err } - if err := tm.changeTypeLocked(ctx, topodatapb.TabletType_MASTER, DBActionSetReadWrite); err != nil { + if err := tm.changeTypeLocked(ctx, topodatapb.TabletType_MASTER); err != nil { return "", err } diff --git a/go/vt/vttablet/tabletmanager/tm_state.go b/go/vt/vttablet/tabletmanager/tm_state.go index 68ba9235c49..80a4d5e05b5 100644 --- a/go/vt/vttablet/tabletmanager/tm_state.go +++ b/go/vt/vttablet/tabletmanager/tm_state.go @@ -150,7 +150,7 @@ func (ts *tmState) RefreshFromTopoInfo(ctx context.Context, shardInfo *topo.Shar ts.updateLocked(ctx) } -func (ts *tmState) ChangeTabletType(ctx context.Context, tabletType topodatapb.TabletType, action DBAction) error { +func (ts *tmState) ChangeTabletType(ctx context.Context, tabletType topodatapb.TabletType) error { ts.mu.Lock() defer ts.mu.Unlock() log.Infof("Changing Tablet Type: %v", tabletType) @@ -163,12 +163,10 @@ func (ts *tmState) ChangeTabletType(ctx context.Context, tabletType topodatapb.T if err != nil { return err } - if action == DBActionSetReadWrite { - // We call SetReadOnly only after the topo has been updated to avoid - // situations where two tablets are master at the DB level but not at the vitess level - if err := ts.tm.MysqlDaemon.SetReadOnly(false); err != nil { - return err - } + // We call SetReadOnly only after the topo has been updated to avoid + // situations where two tablets are master at the DB level but not at the vitess level + if err := ts.tm.MysqlDaemon.SetReadOnly(false); err != nil { + return err } ts.tablet.Type = tabletType diff --git a/go/vt/vttablet/tabletmanager/tm_state_test.go b/go/vt/vttablet/tabletmanager/tm_state_test.go index 4de8c87d557..680fa41e08d 100644 --- a/go/vt/vttablet/tabletmanager/tm_state_test.go +++ b/go/vt/vttablet/tabletmanager/tm_state_test.go @@ -176,14 +176,14 @@ func TestStateChangeTabletType(t *testing.T) { Uid: 2, } - err := tm.tmState.ChangeTabletType(ctx, topodatapb.TabletType_MASTER, DBActionSetReadWrite) + err := tm.tmState.ChangeTabletType(ctx, topodatapb.TabletType_MASTER) require.NoError(t, err) ti, err := ts.GetTablet(ctx, alias) require.NoError(t, err) assert.Equal(t, topodatapb.TabletType_MASTER, ti.Type) assert.NotNil(t, ti.MasterTermStartTime) - err = tm.tmState.ChangeTabletType(ctx, topodatapb.TabletType_REPLICA, DBActionNone) + err = tm.tmState.ChangeTabletType(ctx, topodatapb.TabletType_REPLICA) require.NoError(t, err) ti, err = ts.GetTablet(ctx, alias) require.NoError(t, err) From 6dfdfdc7fbd516912ba6075ac5e9c6265256a0fd Mon Sep 17 00:00:00 2001 From: Rafael Chacon Date: Thu, 1 Oct 2020 09:45:05 -0400 Subject: [PATCH 15/15] Revert "Merge pull request #6721 from tinyspeck/fixes-long-wait-filter-keyspace-upstream" This reverts commit 6b9229e4a13f3e4e82767ad449446a7327b9f305. --- go/vt/discovery/healthcheck.go | 27 ------ go/vt/discovery/healthcheck_test.go | 89 ------------------- .../legacy_tablet_stats_cache_wait.go | 6 -- go/vt/vtgate/discoverygateway.go | 3 +- go/vt/vtgate/discoverygateway_test.go | 78 ---------------- go/vt/vttablet/tabletconn/tablet_conn.go | 8 -- 6 files changed, 1 insertion(+), 210 deletions(-) diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index 5a94a372b64..d0ca06a7062 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -612,30 +612,8 @@ func (hc *HealthCheckImpl) WaitForAllServingTablets(ctx context.Context, targets return hc.waitForTablets(ctx, targets, true) } -// FilterTargetsByKeyspaces only returns the targets that are part of the provided keyspaces -func FilterTargetsByKeyspaces(keyspaces []string, targets []*query.Target) []*query.Target { - filteredTargets := make([]*query.Target, 0) - - // Keep them all if there are no keyspaces to watch - if len(KeyspacesToWatch) == 0 { - return append(filteredTargets, targets...) - } - - // Let's remove from the target shards that are not in the keyspaceToWatch list. - for _, target := range targets { - for _, keyspaceToWatch := range keyspaces { - if target.Keyspace == keyspaceToWatch { - filteredTargets = append(filteredTargets, target) - } - } - } - return filteredTargets -} - // waitForTablets is the internal method that polls for tablets. func (hc *HealthCheckImpl) waitForTablets(ctx context.Context, targets []*query.Target, requireServing bool) error { - targets = FilterTargetsByKeyspaces(KeyspacesToWatch, targets) - for { // We nil targets as we find them. allPresent := true @@ -667,11 +645,6 @@ func (hc *HealthCheckImpl) waitForTablets(ctx context.Context, targets []*query. select { case <-ctx.Done(): timer.Stop() - for _, target := range targets { - if target != nil { - log.Infof("couldn't find tablets for target: %v", target) - } - } return ctx.Err() case <-timer.C: } diff --git a/go/vt/discovery/healthcheck_test.go b/go/vt/discovery/healthcheck_test.go index a4b2483d47a..98bb5c57aea 100644 --- a/go/vt/discovery/healthcheck_test.go +++ b/go/vt/discovery/healthcheck_test.go @@ -44,7 +44,6 @@ import ( "vitess.io/vitess/go/vt/vttablet/queryservice" "vitess.io/vitess/go/vt/vttablet/tabletconn" - "vitess.io/vitess/go/vt/proto/query" querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) @@ -426,94 +425,6 @@ func TestHealthCheckTimeout(t *testing.T) { mustMatch(t, want, result, "Wrong TabletHealth data") } -func TestWaitForAllServingTablets(t *testing.T) { - ts := memorytopo.NewServer("cell") - hc := createTestHc(ts) - defer hc.Close() - tablet := createTestTablet(0, "cell", "a") - tablet.Type = topodatapb.TabletType_REPLICA - targets := []*query.Target{ - { - Keyspace: tablet.Keyspace, - Shard: tablet.Shard, - TabletType: tablet.Type, - }, - } - input := make(chan *querypb.StreamHealthResponse) - createFakeConn(tablet, input) - - // create a channel and subscribe to healthcheck - resultChan := hc.Subscribe() - hc.AddTablet(tablet) - // there will be a first result, get and discard it - <-resultChan - // empty - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) - defer cancel() - - err := hc.WaitForAllServingTablets(ctx, targets) - assert.NotNil(t, err, "error should not be nil") - - shr := &querypb.StreamHealthResponse{ - TabletAlias: tablet.Alias, - Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA}, - Serving: true, - TabletExternallyReparentedTimestamp: 0, - RealtimeStats: &querypb.RealtimeStats{SecondsBehindMaster: 1, CpuUsage: 0.2}, - } - - input <- shr - <-resultChan - // // check it's there - - targets = []*query.Target{ - { - Keyspace: tablet.Keyspace, - Shard: tablet.Shard, - TabletType: tablet.Type, - }, - } - - err = hc.WaitForAllServingTablets(ctx, targets) - assert.Nil(t, err, "error should be nil. Targets are found") - - targets = []*query.Target{ - { - Keyspace: tablet.Keyspace, - Shard: tablet.Shard, - TabletType: tablet.Type, - }, - { - Keyspace: "newkeyspace", - Shard: tablet.Shard, - TabletType: tablet.Type, - }, - } - - err = hc.WaitForAllServingTablets(ctx, targets) - assert.NotNil(t, err, "error should not be nil (there are no tablets on this keyspace") - - targets = []*query.Target{ - { - Keyspace: tablet.Keyspace, - Shard: tablet.Shard, - TabletType: tablet.Type, - }, - { - Keyspace: "newkeyspace", - Shard: tablet.Shard, - TabletType: tablet.Type, - }, - } - - KeyspacesToWatch = []string{tablet.Keyspace} - - err = hc.WaitForAllServingTablets(ctx, targets) - assert.Nil(t, err, "error should be nil. Keyspace with no tablets is filtered") - - KeyspacesToWatch = []string{} -} - // TestGetHealthyTablets tests the functionality of GetHealthyTabletStats. func TestGetHealthyTablets(t *testing.T) { ts := memorytopo.NewServer("cell") diff --git a/go/vt/discovery/legacy_tablet_stats_cache_wait.go b/go/vt/discovery/legacy_tablet_stats_cache_wait.go index 976aa9e7760..d984ff7d1ce 100644 --- a/go/vt/discovery/legacy_tablet_stats_cache_wait.go +++ b/go/vt/discovery/legacy_tablet_stats_cache_wait.go @@ -21,7 +21,6 @@ import ( "golang.org/x/net/context" - "vitess.io/vitess/go/vt/log" querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) @@ -85,11 +84,6 @@ func (tc *LegacyTabletStatsCache) waitForTablets(ctx context.Context, targets [] timer := time.NewTimer(waitAvailableTabletInterval) select { case <-ctx.Done(): - for _, target := range targets { - if target != nil { - log.Infof("couldn't find tablets for target: %v", target) - } - } timer.Stop() return ctx.Err() case <-timer.C: diff --git a/go/vt/vtgate/discoverygateway.go b/go/vt/vtgate/discoverygateway.go index 11a47bdbfda..fbcb0becaa5 100644 --- a/go/vt/vtgate/discoverygateway.go +++ b/go/vt/vtgate/discoverygateway.go @@ -214,8 +214,7 @@ func (dg *DiscoveryGateway) WaitForTablets(ctx context.Context, tabletTypesToWai return err } - filteredTargets := discovery.FilterTargetsByKeyspaces(discovery.KeyspacesToWatch, targets) - return dg.tsc.WaitForAllServingTablets(ctx, filteredTargets) + return dg.tsc.WaitForAllServingTablets(ctx, targets) } // Close shuts down underlying connections. diff --git a/go/vt/vtgate/discoverygateway_test.go b/go/vt/vtgate/discoverygateway_test.go index dc09081baaf..95772daf592 100644 --- a/go/vt/vtgate/discoverygateway_test.go +++ b/go/vt/vtgate/discoverygateway_test.go @@ -20,7 +20,6 @@ import ( "fmt" "strings" "testing" - "time" "vitess.io/vitess/go/vt/log" @@ -35,7 +34,6 @@ import ( "vitess.io/vitess/go/vt/vterrors" querypb "vitess.io/vitess/go/vt/proto/query" - "vitess.io/vitess/go/vt/proto/topodata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) @@ -136,82 +134,6 @@ func TestDiscoveryGatewayGetTablets(t *testing.T) { } } -func TestDiscoveryGatewayWaitForTablets(t *testing.T) { - keyspace := "ks" - shard := "0" - cell := "local" - hc := discovery.NewFakeLegacyHealthCheck() - ts := memorytopo.NewServer("local") - srvTopo := srvtopotest.NewPassthroughSrvTopoServer() - srvTopo.TopoServer = ts - srvTopo.SrvKeyspaceNames = []string{keyspace} - srvTopo.SrvKeyspace = &topodatapb.SrvKeyspace{ - Partitions: []*topodatapb.SrvKeyspace_KeyspacePartition{ - { - ServedType: topodata.TabletType_MASTER, - ShardReferences: []*topodatapb.ShardReference{ - { - Name: shard, - }, - }, - }, - { - ServedType: topodata.TabletType_REPLICA, - ShardReferences: []*topodatapb.ShardReference{ - { - Name: shard, - }, - }, - }, - { - ServedType: topodata.TabletType_RDONLY, - ShardReferences: []*topodatapb.ShardReference{ - { - Name: shard, - }, - }, - }, - }, - } - - dg := NewDiscoveryGateway(context.Background(), hc, srvTopo, "local", 2) - - // replica should only use local ones - hc.Reset() - dg.tsc.ResetForTesting() - hc.AddTestTablet(cell, "2.2.2.2", 1001, keyspace, shard, topodatapb.TabletType_REPLICA, true, 10, nil) - hc.AddTestTablet(cell, "1.1.1.1", 1001, keyspace, shard, topodatapb.TabletType_MASTER, true, 5, nil) - ctx, _ := context.WithTimeout(context.Background(), 1*time.Second) - err := dg.WaitForTablets(ctx, []topodatapb.TabletType{topodatapb.TabletType_REPLICA, topodatapb.TabletType_MASTER}) - if err != nil { - t.Errorf("want %+v, got %+v", nil, err) - } - - // fails if there are no available tablets for the desired TabletType - err = dg.WaitForTablets(ctx, []topodatapb.TabletType{topodatapb.TabletType_RDONLY}) - if err == nil { - t.Errorf("expected error, got nil") - } - - // errors because there is no primary on ks2 - ctx, _ = context.WithTimeout(context.Background(), 1*time.Second) - srvTopo.SrvKeyspaceNames = []string{keyspace, "ks2"} - err = dg.WaitForTablets(ctx, []topodatapb.TabletType{topodatapb.TabletType_MASTER}) - if err == nil { - t.Errorf("expected error, got nil") - } - - discovery.KeyspacesToWatch = []string{keyspace} - // does not wait for ks2 if it's not part of the filter - ctx, _ = context.WithTimeout(context.Background(), 1*time.Second) - err = dg.WaitForTablets(ctx, []topodatapb.TabletType{topodatapb.TabletType_MASTER}) - if err != nil { - t.Errorf("want %+v, got %+v", nil, err) - } - - discovery.KeyspacesToWatch = []string{} -} - func TestShuffleTablets(t *testing.T) { ts1 := discovery.LegacyTabletStats{ Key: "t1", diff --git a/go/vt/vttablet/tabletconn/tablet_conn.go b/go/vt/vttablet/tabletconn/tablet_conn.go index 97a6ea2ee1b..2580c34fad0 100644 --- a/go/vt/vttablet/tabletconn/tablet_conn.go +++ b/go/vt/vttablet/tabletconn/tablet_conn.go @@ -18,7 +18,6 @@ package tabletconn import ( "flag" - "sync" "vitess.io/vitess/go/vt/grpcclient" "vitess.io/vitess/go/vt/log" @@ -51,14 +50,9 @@ type TabletDialer func(tablet *topodatapb.Tablet, failFast grpcclient.FailFast) var dialers = make(map[string]TabletDialer) -// mu This mutex helps us prevent data races when registering / getting dialers -var mu sync.Mutex - // RegisterDialer is meant to be used by TabletDialer implementations // to self register. func RegisterDialer(name string, dialer TabletDialer) { - mu.Lock() - defer mu.Unlock() if _, ok := dialers[name]; ok { log.Fatalf("Dialer %s already exists", name) } @@ -67,8 +61,6 @@ func RegisterDialer(name string, dialer TabletDialer) { // GetDialer returns the dialer to use, described by the command line flag func GetDialer() TabletDialer { - mu.Lock() - defer mu.Unlock() td, ok := dialers[*TabletProtocol] if !ok { log.Exitf("No dialer registered for tablet protocol %s", *TabletProtocol)