diff --git a/go/test/endtoend/orchestrator/orc_test.go b/go/test/endtoend/orchestrator/orc_test.go index e0da62d1baa..5c9870a7b47 100644 --- a/go/test/endtoend/orchestrator/orc_test.go +++ b/go/test/endtoend/orchestrator/orc_test.go @@ -475,8 +475,8 @@ func checkInsertedValues(t *testing.T, tablet *cluster.Vttablet, index int) erro func validateTopology(t *testing.T, cluster *cluster.LocalProcessCluster, pingTablets bool) { if pingTablets { - err := cluster.VtctlclientProcess.ExecuteCommand("Validate", "-ping-tablets=true") - require.NoError(t, err) + out, err := cluster.VtctlclientProcess.ExecuteCommandWithOutput("Validate", "-ping-tablets=true") + require.NoError(t, err, out) } else { err := cluster.VtctlclientProcess.ExecuteCommand("Validate") require.NoError(t, err) diff --git a/go/test/endtoend/orchestrator/test_config.json b/go/test/endtoend/orchestrator/test_config.json index 7494692bb7f..8344ed389ff 100644 --- a/go/test/endtoend/orchestrator/test_config.json +++ b/go/test/endtoend/orchestrator/test_config.json @@ -5,5 +5,7 @@ "MySQLReplicaUser": "vt_repl", "MySQLReplicaPassword": "", "RecoveryPeriodBlockSeconds": 1, - "InstancePollSeconds": 1 + "InstancePollSeconds": 1, + "Keyspace": "ks", + "Shard": "0" } diff --git a/go/vt/orchestrator/config/config.go b/go/vt/orchestrator/config/config.go index 360245a0ad1..dfadc717c85 100644 --- a/go/vt/orchestrator/config/config.go +++ b/go/vt/orchestrator/config/config.go @@ -255,6 +255,8 @@ type Configuration struct { MaxConcurrentReplicaOperations int // Maximum number of concurrent operations on replicas InstanceDBExecContextTimeoutSeconds int // Timeout on context used while calling ExecContext on instance database LockShardTimeoutSeconds int // Timeout on context used to lock shard. Should be a small value because we should fail-fast + Keyspace string // Keyspace that this instance is watching + Shard string // Shard that this instance is watching } // ToJSONString will marshal this configuration as JSON @@ -429,6 +431,9 @@ func newConfiguration() *Configuration { } func (this *Configuration) postReadAdjustments() error { + if this.Keyspace == "" || this.Shard == "" { + log.Fatalf("Keyspace and Shard are required") + } if this.MySQLOrchestratorCredentialsConfigFile != "" { mySQLConfig := struct { Client struct { diff --git a/go/vt/orchestrator/logic/tablet_discovery.go b/go/vt/orchestrator/logic/tablet_discovery.go index 6a44273aab8..bbcc33509ec 100644 --- a/go/vt/orchestrator/logic/tablet_discovery.go +++ b/go/vt/orchestrator/logic/tablet_discovery.go @@ -20,7 +20,6 @@ import ( "context" "errors" "fmt" - "sync" "time" "vitess.io/vitess/go/vt/orchestrator/config" @@ -32,7 +31,6 @@ import ( "vitess.io/vitess/go/vt/orchestrator/inst" topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/topo" - "vitess.io/vitess/go/vt/topotools" "vitess.io/vitess/go/vt/vttablet/tmclient" ) @@ -71,36 +69,16 @@ func refreshTabletsUsing(loader func(instanceKey *inst.InstanceKey)) { } ctx, cancel := context.WithTimeout(context.Background(), *topo.RemoteOperationTimeout) defer cancel() - cells, err := ts.GetKnownCells(ctx) + tablets, err := ts.GetTabletMapForShard(ctx, config.Config.Keyspace, config.Config.Shard) if err != nil { - log.Errore(err) - return - } - - refreshCtx, refreshCancel := context.WithTimeout(context.Background(), *topo.RemoteOperationTimeout) - defer refreshCancel() - var wg sync.WaitGroup - for _, cell := range cells { - wg.Add(1) - go func(cell string) { - defer wg.Done() - refreshTabletsInCell(refreshCtx, cell, loader) - }(cell) - } - wg.Wait() -} - -func refreshTabletsInCell(ctx context.Context, cell string, loader func(instanceKey *inst.InstanceKey)) { - latestInstances := make(map[inst.InstanceKey]bool) - tablets, err := topotools.GetAllTablets(ctx, ts, cell) - if err != nil { - log.Errorf("Error fetching topo info for cell %v: %v", cell, err) + log.Errorf("Error fetching topo info for keyspace/shard %v/%v: %v", config.Config.Keyspace, config.Config.Shard, err) return } // Discover new tablets. // TODO(sougou): enhance this to work with multi-schema, // where each instanceKey can have multiple tablets. + latestInstances := make(map[inst.InstanceKey]bool) for _, tabletInfo := range tablets { tablet := tabletInfo.Tablet if tablet.MysqlHostname == "" { @@ -132,8 +110,8 @@ func refreshTabletsInCell(ctx context.Context, cell string, loader func(instance // Forget tablets that were removed. toForget := make(map[inst.InstanceKey]*topodatapb.Tablet) - query := "select hostname, port, info from vitess_tablet where cell = ?" - err = db.QueryOrchestrator(query, sqlutils.Args(cell), func(row sqlutils.RowMap) error { + query := "select hostname, port, info from vitess_tablet" + err = db.QueryOrchestrator(query, nil, func(row sqlutils.RowMap) error { curKey := inst.InstanceKey{ Hostname: row.GetString("hostname"), Port: row.GetInt("port"), @@ -152,7 +130,7 @@ func refreshTabletsInCell(ctx context.Context, cell string, loader func(instance log.Errore(err) } for instanceKey, tablet := range toForget { - log.Infof("Forgeting: %v", tablet) + log.Infof("Forgetting: %v", tablet) _, err := db.ExecOrchestrator(` delete from vitess_tablet