From 1b2dc0f7d17a163902c2f36bd2c5e26ea26526c4 Mon Sep 17 00:00:00 2001 From: deepthi Date: Mon, 5 Oct 2020 12:35:16 -0700 Subject: [PATCH 1/2] Add clusters_to_watch flag to orchestrator. Defaults to all clusters Signed-off-by: deepthi --- go/test/endtoend/orchestrator/orc_test.go | 59 ++++++++-- go/vt/orchestrator/db/generate_base.go | 5 + go/vt/orchestrator/inst/tablet_dao.go | 6 +- go/vt/orchestrator/logic/tablet_discovery.go | 109 +++++++++++++++---- go/vt/topotools/utils.go | 14 +++ 5 files changed, 158 insertions(+), 35 deletions(-) diff --git a/go/test/endtoend/orchestrator/orc_test.go b/go/test/endtoend/orchestrator/orc_test.go index 3cde3801396..153879a9674 100644 --- a/go/test/endtoend/orchestrator/orc_test.go +++ b/go/test/endtoend/orchestrator/orc_test.go @@ -40,7 +40,7 @@ import ( topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) -func createCluster(t *testing.T, numReplicas int, numRdonly int) *cluster.LocalProcessCluster { +func createCluster(t *testing.T, numReplicas int, numRdonly int, orcExtraArgs []string) *cluster.LocalProcessCluster { keyspaceName := "ks" shardName := "0" keyspace := &cluster.Keyspace{Name: keyspaceName} @@ -113,6 +113,7 @@ func createCluster(t *testing.T, numReplicas int, numRdonly int) *cluster.LocalP // Start orchestrator clusterInstance.OrcProcess = clusterInstance.NewOrcProcess(path.Join(os.Getenv("PWD"), "test_config.json")) + clusterInstance.OrcProcess.ExtraArgs = orcExtraArgs err = clusterInstance.OrcProcess.Setup() require.NoError(t, err) @@ -125,7 +126,45 @@ func createCluster(t *testing.T, numReplicas int, numRdonly int) *cluster.LocalP // verify replication is setup func TestMasterElection(t *testing.T) { defer cluster.PanicHandler(t) - clusterInstance := createCluster(t, 1, 1) + clusterInstance := createCluster(t, 1, 1, nil) + keyspace := &clusterInstance.Keyspaces[0] + shard0 := &keyspace.Shards[0] + defer func() { + clusterInstance.Teardown() + killTablets(t, shard0) + }() + + //log.Exitf("error") + checkMasterTablet(t, clusterInstance, shard0.Vttablets[0]) + checkReplication(t, clusterInstance, shard0.Vttablets[0], shard0.Vttablets[1:]) +} + +// Cases to test: +// 1. create cluster with 1 replica and 1 rdonly, let orc choose master +// verify rdonly is not elected, only replica +// verify replication is setup +func TestSingleKeyspace(t *testing.T) { + defer cluster.PanicHandler(t) + clusterInstance := createCluster(t, 1, 1, []string{"-clusters_to_watch", "ks"}) + keyspace := &clusterInstance.Keyspaces[0] + shard0 := &keyspace.Shards[0] + defer func() { + clusterInstance.Teardown() + killTablets(t, shard0) + }() + + //log.Exitf("error") + checkMasterTablet(t, clusterInstance, shard0.Vttablets[0]) + checkReplication(t, clusterInstance, shard0.Vttablets[0], shard0.Vttablets[1:]) +} + +// Cases to test: +// 1. create cluster with 1 replica and 1 rdonly, let orc choose master +// verify rdonly is not elected, only replica +// verify replication is setup +func TestKeyspaceShard(t *testing.T) { + defer cluster.PanicHandler(t) + clusterInstance := createCluster(t, 1, 1, []string{"-clusters_to_watch", "ks/0"}) keyspace := &clusterInstance.Keyspaces[0] shard0 := &keyspace.Shards[0] defer func() { @@ -141,7 +180,7 @@ func TestMasterElection(t *testing.T) { // 2. bring down master, let orc promote replica func TestDownMaster(t *testing.T) { defer cluster.PanicHandler(t) - clusterInstance := createCluster(t, 2, 0) + clusterInstance := createCluster(t, 2, 0, nil) keyspace := &clusterInstance.Keyspaces[0] shard0 := &keyspace.Shards[0] defer func() { @@ -168,7 +207,7 @@ func TestDownMaster(t *testing.T) { // 3. make master readonly, let orc repair func TestMasterReadOnly(t *testing.T) { defer cluster.PanicHandler(t) - clusterInstance := createCluster(t, 2, 0) + clusterInstance := createCluster(t, 2, 0, nil) keyspace := &clusterInstance.Keyspaces[0] shard0 := &keyspace.Shards[0] defer func() { @@ -196,7 +235,7 @@ func TestMasterReadOnly(t *testing.T) { // 4. make replica ReadWrite, let orc repair func TestReplicaReadWrite(t *testing.T) { defer cluster.PanicHandler(t) - clusterInstance := createCluster(t, 2, 0) + clusterInstance := createCluster(t, 2, 0, nil) keyspace := &clusterInstance.Keyspaces[0] shard0 := &keyspace.Shards[0] defer func() { @@ -232,7 +271,7 @@ func TestReplicaReadWrite(t *testing.T) { // 5. stop replication, let orc repair func TestStopReplication(t *testing.T) { defer cluster.PanicHandler(t) - clusterInstance := createCluster(t, 2, 0) + clusterInstance := createCluster(t, 2, 0, nil) keyspace := &clusterInstance.Keyspaces[0] shard0 := &keyspace.Shards[0] defer func() { @@ -271,7 +310,7 @@ func TestStopReplication(t *testing.T) { // 6. setup replication from non-master, let orc repair func TestReplicationFromOtherReplica(t *testing.T) { defer cluster.PanicHandler(t) - clusterInstance := createCluster(t, 3, 0) + clusterInstance := createCluster(t, 3, 0, nil) keyspace := &clusterInstance.Keyspaces[0] shard0 := &keyspace.Shards[0] defer func() { @@ -322,7 +361,7 @@ func TestRepairAfterTER(t *testing.T) { // test fails intermittently on CI, skip until it can be fixed. t.SkipNow() defer cluster.PanicHandler(t) - clusterInstance := createCluster(t, 2, 0) + clusterInstance := createCluster(t, 2, 0, nil) keyspace := &clusterInstance.Keyspaces[0] shard0 := &keyspace.Shards[0] defer func() { @@ -469,8 +508,8 @@ func checkInsertedValues(t *testing.T, tablet *cluster.Vttablet, index int) erro func validateTopology(t *testing.T, cluster *cluster.LocalProcessCluster, pingTablets bool) { if pingTablets { - err := cluster.VtctlclientProcess.ExecuteCommand("Validate", "-ping-tablets=true") - require.NoError(t, err) + out, err := cluster.VtctlclientProcess.ExecuteCommandWithOutput("Validate", "-ping-tablets=true") + require.NoError(t, err, out) } else { err := cluster.VtctlclientProcess.ExecuteCommand("Validate") require.NoError(t, err) diff --git a/go/vt/orchestrator/db/generate_base.go b/go/vt/orchestrator/db/generate_base.go index a2cdb3fad55..d95dbb6b8a6 100644 --- a/go/vt/orchestrator/db/generate_base.go +++ b/go/vt/orchestrator/db/generate_base.go @@ -856,6 +856,8 @@ var generateSQLBase = []string{ hostname varchar(128) CHARACTER SET ascii NOT NULL, port smallint(5) unsigned NOT NULL, cell varchar(128) CHARACTER SET ascii NOT NULL, + keyspace varchar(128) CHARACTER SET ascii NOT NULL, + shard varchar(128) CHARACTER SET ascii NOT NULL, tablet_type smallint(5) NOT NULL, master_timestamp timestamp NOT NULL, info varchar(512) CHARACTER SET ascii NOT NULL, @@ -865,4 +867,7 @@ var generateSQLBase = []string{ ` CREATE INDEX cell_idx_vitess_tablet ON vitess_tablet (cell) `, + ` + CREATE INDEX ks_idx_vitess_tablet ON vitess_tablet (keyspace, shard) + `, } diff --git a/go/vt/orchestrator/inst/tablet_dao.go b/go/vt/orchestrator/inst/tablet_dao.go index 36973c3b642..52adecfccfa 100644 --- a/go/vt/orchestrator/inst/tablet_dao.go +++ b/go/vt/orchestrator/inst/tablet_dao.go @@ -139,14 +139,16 @@ func SaveTablet(tablet *topodatapb.Tablet) error { _, err := db.ExecOrchestrator(` replace into vitess_tablet ( - hostname, port, cell, tablet_type, master_timestamp, info + hostname, port, cell, keyspace, shard, tablet_type, master_timestamp, info ) values ( - ?, ?, ?, ?, ?, ? + ?, ?, ?, ?, ?, ?, ?, ? ) `, tablet.MysqlHostname, int(tablet.MysqlPort), tablet.Alias.Cell, + tablet.Keyspace, + tablet.Shard, int(tablet.Type), logutil.ProtoToTime(tablet.MasterTermStartTime), proto.CompactTextString(tablet), diff --git a/go/vt/orchestrator/logic/tablet_discovery.go b/go/vt/orchestrator/logic/tablet_discovery.go index 6a44273aab8..75b63f6ac37 100644 --- a/go/vt/orchestrator/logic/tablet_discovery.go +++ b/go/vt/orchestrator/logic/tablet_discovery.go @@ -19,7 +19,9 @@ package logic import ( "context" "errors" + "flag" "fmt" + "strings" "sync" "time" @@ -37,7 +39,8 @@ import ( ) var ( - ts *topo.Server + ts *topo.Server + clustersToWatch = flag.String("clusters_to_watch", "", "Comma-separated list of keyspaces or keyspace/shards that this instance will monitor and repair. Defaults to all clusters in the topology. Example: \"ks1,ks2/-80\"") ) // OpenTabletDiscovery opens the vitess topo if enables and returns a ticker @@ -69,38 +72,99 @@ func refreshTabletsUsing(loader func(instanceKey *inst.InstanceKey)) { if !IsLeaderOrActive() { return } - ctx, cancel := context.WithTimeout(context.Background(), *topo.RemoteOperationTimeout) - defer cancel() - cells, err := ts.GetKnownCells(ctx) - if err != nil { - log.Errore(err) - return - } + if *clustersToWatch == "" { // all known clusters + ctx, cancel := context.WithTimeout(context.Background(), *topo.RemoteOperationTimeout) + defer cancel() + cells, err := ts.GetKnownCells(ctx) + if err != nil { + log.Errore(err) + return + } - refreshCtx, refreshCancel := context.WithTimeout(context.Background(), *topo.RemoteOperationTimeout) - defer refreshCancel() - var wg sync.WaitGroup - for _, cell := range cells { - wg.Add(1) - go func(cell string) { - defer wg.Done() - refreshTabletsInCell(refreshCtx, cell, loader) - }(cell) + refreshCtx, refreshCancel := context.WithTimeout(context.Background(), *topo.RemoteOperationTimeout) + defer refreshCancel() + var wg sync.WaitGroup + for _, cell := range cells { + wg.Add(1) + go func(cell string) { + defer wg.Done() + refreshTabletsInCell(refreshCtx, cell, loader) + }(cell) + } + wg.Wait() + } else { + // Parse input and build list of keyspaces / shards + var keyspaceShards []*topo.KeyspaceShard + inputs := strings.Split(*clustersToWatch, ",") + for _, ks := range inputs { + if strings.Contains(ks, "/") { + // This is a keyspace/shard specification + input := strings.Split(ks, "/") + keyspaceShards = append(keyspaceShards, &topo.KeyspaceShard{Keyspace: input[0], Shard: input[1]}) + } else { + // Assume this is a keyspace and find all shards in keyspace + ctx, cancel := context.WithTimeout(context.Background(), *topo.RemoteOperationTimeout) + defer cancel() + shards, err := ts.GetShardNames(ctx, ks) + if err != nil { + // Log the errr and continue + log.Errorf("Error fetching shards for keyspace: %v", ks) + continue + } + if len(shards) == 0 { + log.Errorf("Topo has no shards for ks: %v", ks) + continue + } + for _, s := range shards { + keyspaceShards = append(keyspaceShards, &topo.KeyspaceShard{Keyspace: ks, Shard: s}) + } + } + } + if len(keyspaceShards) == 0 { + log.Errorf("Found no keyspaceShards for input: %v", *clustersToWatch) + return + } + refreshCtx, refreshCancel := context.WithTimeout(context.Background(), *topo.RemoteOperationTimeout) + defer refreshCancel() + var wg sync.WaitGroup + for _, ks := range keyspaceShards { + wg.Add(1) + go func(ks *topo.KeyspaceShard) { + defer wg.Done() + refreshTabletsInKeyspaceShard(refreshCtx, ks.Keyspace, ks.Shard, loader) + }(ks) + } + wg.Wait() } - wg.Wait() } func refreshTabletsInCell(ctx context.Context, cell string, loader func(instanceKey *inst.InstanceKey)) { - latestInstances := make(map[inst.InstanceKey]bool) - tablets, err := topotools.GetAllTablets(ctx, ts, cell) + tablets, err := topotools.GetTabletMapForCell(ctx, ts, cell) if err != nil { log.Errorf("Error fetching topo info for cell %v: %v", cell, err) return } + query := "select hostname, port, info from vitess_tablet where cell = ?" + args := sqlutils.Args(cell) + refreshTablets(tablets, query, args, loader) +} + +func refreshTabletsInKeyspaceShard(ctx context.Context, keyspace, shard string, loader func(instanceKey *inst.InstanceKey)) { + tablets, err := ts.GetTabletMapForShard(ctx, keyspace, shard) + if err != nil { + log.Errorf("Error fetching tablets for keyspace/shard %v/%v: %v", keyspace, shard, err) + return + } + query := "select hostname, port, info from vitess_tablet where keyspace = ? and shard = ?" + args := sqlutils.Args(keyspace, shard) + refreshTablets(tablets, query, args, loader) +} +func refreshTablets(tablets map[string]*topo.TabletInfo, query string, args []interface{}, loader func(instanceKey *inst.InstanceKey)) { // Discover new tablets. // TODO(sougou): enhance this to work with multi-schema, // where each instanceKey can have multiple tablets. + latestInstances := make(map[inst.InstanceKey]bool) for _, tabletInfo := range tablets { tablet := tabletInfo.Tablet if tablet.MysqlHostname == "" { @@ -132,8 +196,7 @@ func refreshTabletsInCell(ctx context.Context, cell string, loader func(instance // Forget tablets that were removed. toForget := make(map[inst.InstanceKey]*topodatapb.Tablet) - query := "select hostname, port, info from vitess_tablet where cell = ?" - err = db.QueryOrchestrator(query, sqlutils.Args(cell), func(row sqlutils.RowMap) error { + err := db.QueryOrchestrator(query, args, func(row sqlutils.RowMap) error { curKey := inst.InstanceKey{ Hostname: row.GetString("hostname"), Port: row.GetInt("port"), @@ -152,7 +215,7 @@ func refreshTabletsInCell(ctx context.Context, cell string, loader func(instance log.Errore(err) } for instanceKey, tablet := range toForget { - log.Infof("Forgeting: %v", tablet) + log.Infof("Forgetting: %v", tablet) _, err := db.ExecOrchestrator(` delete from vitess_tablet diff --git a/go/vt/topotools/utils.go b/go/vt/topotools/utils.go index e2d27b160ab..c0d6aefe68a 100644 --- a/go/vt/topotools/utils.go +++ b/go/vt/topotools/utils.go @@ -68,6 +68,20 @@ func GetAllTablets(ctx context.Context, ts *topo.Server, cell string) ([]*topo.T return tablets, nil } +// GetTabletMapForCell returns a map of TabletInfo keyed by alias as string +func GetTabletMapForCell(ctx context.Context, ts *topo.Server, cell string) (map[string]*topo.TabletInfo, error) { + aliases, err := ts.GetTabletsByCell(ctx, cell) + if err != nil { + return nil, err + } + tabletMap, err := ts.GetTabletMap(ctx, aliases) + if err != nil { + // we got another error than topo.ErrNoNode + return nil, err + } + return tabletMap, nil +} + // GetAllTabletsAcrossCells returns all tablets from known cells. // If it returns topo.ErrPartialResult, then the list is valid, but partial. func GetAllTabletsAcrossCells(ctx context.Context, ts *topo.Server) ([]*topo.TabletInfo, error) { From 326bf2d0ccd6a3b83ddbbbc6ffc7ab2597239b97 Mon Sep 17 00:00:00 2001 From: deepthi Date: Mon, 5 Oct 2020 13:17:39 -0700 Subject: [PATCH 2/2] column order in vitess_tablet should be keyspace,shard,cell Signed-off-by: deepthi --- go/vt/orchestrator/db/generate_base.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/orchestrator/db/generate_base.go b/go/vt/orchestrator/db/generate_base.go index d95dbb6b8a6..e72fd87b868 100644 --- a/go/vt/orchestrator/db/generate_base.go +++ b/go/vt/orchestrator/db/generate_base.go @@ -855,9 +855,9 @@ var generateSQLBase = []string{ CREATE TABLE IF NOT EXISTS vitess_tablet ( hostname varchar(128) CHARACTER SET ascii NOT NULL, port smallint(5) unsigned NOT NULL, - cell varchar(128) CHARACTER SET ascii NOT NULL, keyspace varchar(128) CHARACTER SET ascii NOT NULL, shard varchar(128) CHARACTER SET ascii NOT NULL, + cell varchar(128) CHARACTER SET ascii NOT NULL, tablet_type smallint(5) NOT NULL, master_timestamp timestamp NOT NULL, info varchar(512) CHARACTER SET ascii NOT NULL,