Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 49 additions & 10 deletions go/test/endtoend/orchestrator/orc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import (
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

func createCluster(t *testing.T, numReplicas int, numRdonly int) *cluster.LocalProcessCluster {
func createCluster(t *testing.T, numReplicas int, numRdonly int, orcExtraArgs []string) *cluster.LocalProcessCluster {
keyspaceName := "ks"
shardName := "0"
keyspace := &cluster.Keyspace{Name: keyspaceName}
Expand Down Expand Up @@ -113,6 +113,7 @@ func createCluster(t *testing.T, numReplicas int, numRdonly int) *cluster.LocalP

// Start orchestrator
clusterInstance.OrcProcess = clusterInstance.NewOrcProcess(path.Join(os.Getenv("PWD"), "test_config.json"))
clusterInstance.OrcProcess.ExtraArgs = orcExtraArgs
err = clusterInstance.OrcProcess.Setup()
require.NoError(t, err)

Expand All @@ -125,7 +126,45 @@ func createCluster(t *testing.T, numReplicas int, numRdonly int) *cluster.LocalP
// verify replication is setup
func TestMasterElection(t *testing.T) {
defer cluster.PanicHandler(t)
clusterInstance := createCluster(t, 1, 1)
clusterInstance := createCluster(t, 1, 1, nil)
keyspace := &clusterInstance.Keyspaces[0]
shard0 := &keyspace.Shards[0]
defer func() {
clusterInstance.Teardown()
killTablets(t, shard0)
}()

//log.Exitf("error")
checkMasterTablet(t, clusterInstance, shard0.Vttablets[0])
checkReplication(t, clusterInstance, shard0.Vttablets[0], shard0.Vttablets[1:])
}

// Cases to test:
// 1. create cluster with 1 replica and 1 rdonly, let orc choose master
// verify rdonly is not elected, only replica
// verify replication is setup
func TestSingleKeyspace(t *testing.T) {
defer cluster.PanicHandler(t)
clusterInstance := createCluster(t, 1, 1, []string{"-clusters_to_watch", "ks"})
keyspace := &clusterInstance.Keyspaces[0]
shard0 := &keyspace.Shards[0]
defer func() {
clusterInstance.Teardown()
killTablets(t, shard0)
}()

//log.Exitf("error")
checkMasterTablet(t, clusterInstance, shard0.Vttablets[0])
checkReplication(t, clusterInstance, shard0.Vttablets[0], shard0.Vttablets[1:])
}

// Cases to test:
// 1. create cluster with 1 replica and 1 rdonly, let orc choose master
// verify rdonly is not elected, only replica
// verify replication is setup
func TestKeyspaceShard(t *testing.T) {
defer cluster.PanicHandler(t)
clusterInstance := createCluster(t, 1, 1, []string{"-clusters_to_watch", "ks/0"})
keyspace := &clusterInstance.Keyspaces[0]
shard0 := &keyspace.Shards[0]
defer func() {
Expand All @@ -141,7 +180,7 @@ func TestMasterElection(t *testing.T) {
// 2. bring down master, let orc promote replica
func TestDownMaster(t *testing.T) {
defer cluster.PanicHandler(t)
clusterInstance := createCluster(t, 2, 0)
clusterInstance := createCluster(t, 2, 0, nil)
keyspace := &clusterInstance.Keyspaces[0]
shard0 := &keyspace.Shards[0]
defer func() {
Expand All @@ -168,7 +207,7 @@ func TestDownMaster(t *testing.T) {
// 3. make master readonly, let orc repair
func TestMasterReadOnly(t *testing.T) {
defer cluster.PanicHandler(t)
clusterInstance := createCluster(t, 2, 0)
clusterInstance := createCluster(t, 2, 0, nil)
keyspace := &clusterInstance.Keyspaces[0]
shard0 := &keyspace.Shards[0]
defer func() {
Expand Down Expand Up @@ -196,7 +235,7 @@ func TestMasterReadOnly(t *testing.T) {
// 4. make replica ReadWrite, let orc repair
func TestReplicaReadWrite(t *testing.T) {
defer cluster.PanicHandler(t)
clusterInstance := createCluster(t, 2, 0)
clusterInstance := createCluster(t, 2, 0, nil)
keyspace := &clusterInstance.Keyspaces[0]
shard0 := &keyspace.Shards[0]
defer func() {
Expand Down Expand Up @@ -232,7 +271,7 @@ func TestReplicaReadWrite(t *testing.T) {
// 5. stop replication, let orc repair
func TestStopReplication(t *testing.T) {
defer cluster.PanicHandler(t)
clusterInstance := createCluster(t, 2, 0)
clusterInstance := createCluster(t, 2, 0, nil)
keyspace := &clusterInstance.Keyspaces[0]
shard0 := &keyspace.Shards[0]
defer func() {
Expand Down Expand Up @@ -271,7 +310,7 @@ func TestStopReplication(t *testing.T) {
// 6. setup replication from non-master, let orc repair
func TestReplicationFromOtherReplica(t *testing.T) {
defer cluster.PanicHandler(t)
clusterInstance := createCluster(t, 3, 0)
clusterInstance := createCluster(t, 3, 0, nil)
keyspace := &clusterInstance.Keyspaces[0]
shard0 := &keyspace.Shards[0]
defer func() {
Expand Down Expand Up @@ -322,7 +361,7 @@ func TestRepairAfterTER(t *testing.T) {
// test fails intermittently on CI, skip until it can be fixed.
t.SkipNow()
defer cluster.PanicHandler(t)
clusterInstance := createCluster(t, 2, 0)
clusterInstance := createCluster(t, 2, 0, nil)
keyspace := &clusterInstance.Keyspaces[0]
shard0 := &keyspace.Shards[0]
defer func() {
Expand Down Expand Up @@ -469,8 +508,8 @@ func checkInsertedValues(t *testing.T, tablet *cluster.Vttablet, index int) erro

func validateTopology(t *testing.T, cluster *cluster.LocalProcessCluster, pingTablets bool) {
if pingTablets {
err := cluster.VtctlclientProcess.ExecuteCommand("Validate", "-ping-tablets=true")
require.NoError(t, err)
out, err := cluster.VtctlclientProcess.ExecuteCommandWithOutput("Validate", "-ping-tablets=true")
require.NoError(t, err, out)
} else {
err := cluster.VtctlclientProcess.ExecuteCommand("Validate")
require.NoError(t, err)
Expand Down
5 changes: 5 additions & 0 deletions go/vt/orchestrator/db/generate_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -855,6 +855,8 @@ var generateSQLBase = []string{
CREATE TABLE IF NOT EXISTS vitess_tablet (
hostname varchar(128) CHARACTER SET ascii NOT NULL,
port smallint(5) unsigned NOT NULL,
keyspace varchar(128) CHARACTER SET ascii NOT NULL,
shard varchar(128) CHARACTER SET ascii NOT NULL,
cell varchar(128) CHARACTER SET ascii NOT NULL,
tablet_type smallint(5) NOT NULL,
master_timestamp timestamp NOT NULL,
Expand All @@ -865,4 +867,7 @@ var generateSQLBase = []string{
`
CREATE INDEX cell_idx_vitess_tablet ON vitess_tablet (cell)
`,
`
CREATE INDEX ks_idx_vitess_tablet ON vitess_tablet (keyspace, shard)
`,
}
6 changes: 4 additions & 2 deletions go/vt/orchestrator/inst/tablet_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,14 +139,16 @@ func SaveTablet(tablet *topodatapb.Tablet) error {
_, err := db.ExecOrchestrator(`
replace
into vitess_tablet (
hostname, port, cell, tablet_type, master_timestamp, info
hostname, port, cell, keyspace, shard, tablet_type, master_timestamp, info
) values (
?, ?, ?, ?, ?, ?
?, ?, ?, ?, ?, ?, ?, ?
)
`,
tablet.MysqlHostname,
int(tablet.MysqlPort),
tablet.Alias.Cell,
tablet.Keyspace,
tablet.Shard,
int(tablet.Type),
logutil.ProtoToTime(tablet.MasterTermStartTime),
proto.CompactTextString(tablet),
Expand Down
109 changes: 86 additions & 23 deletions go/vt/orchestrator/logic/tablet_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package logic
import (
"context"
"errors"
"flag"
"fmt"
"strings"
"sync"
"time"

Expand All @@ -37,7 +39,8 @@ import (
)

var (
ts *topo.Server
ts *topo.Server
clustersToWatch = flag.String("clusters_to_watch", "", "Comma-separated list of keyspaces or keyspace/shards that this instance will monitor and repair. Defaults to all clusters in the topology. Example: \"ks1,ks2/-80\"")
)

// OpenTabletDiscovery opens the vitess topo if enables and returns a ticker
Expand Down Expand Up @@ -69,38 +72,99 @@ func refreshTabletsUsing(loader func(instanceKey *inst.InstanceKey)) {
if !IsLeaderOrActive() {
return
}
ctx, cancel := context.WithTimeout(context.Background(), *topo.RemoteOperationTimeout)
defer cancel()
cells, err := ts.GetKnownCells(ctx)
if err != nil {
log.Errore(err)
return
}
if *clustersToWatch == "" { // all known clusters
ctx, cancel := context.WithTimeout(context.Background(), *topo.RemoteOperationTimeout)
defer cancel()
cells, err := ts.GetKnownCells(ctx)
if err != nil {
log.Errore(err)
return
}

refreshCtx, refreshCancel := context.WithTimeout(context.Background(), *topo.RemoteOperationTimeout)
defer refreshCancel()
var wg sync.WaitGroup
for _, cell := range cells {
wg.Add(1)
go func(cell string) {
defer wg.Done()
refreshTabletsInCell(refreshCtx, cell, loader)
}(cell)
refreshCtx, refreshCancel := context.WithTimeout(context.Background(), *topo.RemoteOperationTimeout)
defer refreshCancel()
var wg sync.WaitGroup
for _, cell := range cells {
wg.Add(1)
go func(cell string) {
defer wg.Done()
refreshTabletsInCell(refreshCtx, cell, loader)
}(cell)
}
wg.Wait()
} else {
// Parse input and build list of keyspaces / shards
var keyspaceShards []*topo.KeyspaceShard
inputs := strings.Split(*clustersToWatch, ",")
for _, ks := range inputs {
if strings.Contains(ks, "/") {
// This is a keyspace/shard specification
input := strings.Split(ks, "/")
keyspaceShards = append(keyspaceShards, &topo.KeyspaceShard{Keyspace: input[0], Shard: input[1]})
} else {
// Assume this is a keyspace and find all shards in keyspace
ctx, cancel := context.WithTimeout(context.Background(), *topo.RemoteOperationTimeout)
defer cancel()
shards, err := ts.GetShardNames(ctx, ks)
if err != nil {
// Log the errr and continue
log.Errorf("Error fetching shards for keyspace: %v", ks)
continue
}
if len(shards) == 0 {
log.Errorf("Topo has no shards for ks: %v", ks)
continue
}
for _, s := range shards {
keyspaceShards = append(keyspaceShards, &topo.KeyspaceShard{Keyspace: ks, Shard: s})
}
}
}
if len(keyspaceShards) == 0 {
log.Errorf("Found no keyspaceShards for input: %v", *clustersToWatch)
return
}
refreshCtx, refreshCancel := context.WithTimeout(context.Background(), *topo.RemoteOperationTimeout)
defer refreshCancel()
var wg sync.WaitGroup
for _, ks := range keyspaceShards {
wg.Add(1)
go func(ks *topo.KeyspaceShard) {
defer wg.Done()
refreshTabletsInKeyspaceShard(refreshCtx, ks.Keyspace, ks.Shard, loader)
}(ks)
}
wg.Wait()
}
wg.Wait()
}

func refreshTabletsInCell(ctx context.Context, cell string, loader func(instanceKey *inst.InstanceKey)) {
latestInstances := make(map[inst.InstanceKey]bool)
tablets, err := topotools.GetAllTablets(ctx, ts, cell)
tablets, err := topotools.GetTabletMapForCell(ctx, ts, cell)
if err != nil {
log.Errorf("Error fetching topo info for cell %v: %v", cell, err)
return
}
query := "select hostname, port, info from vitess_tablet where cell = ?"
args := sqlutils.Args(cell)
refreshTablets(tablets, query, args, loader)
}

func refreshTabletsInKeyspaceShard(ctx context.Context, keyspace, shard string, loader func(instanceKey *inst.InstanceKey)) {
tablets, err := ts.GetTabletMapForShard(ctx, keyspace, shard)
if err != nil {
log.Errorf("Error fetching tablets for keyspace/shard %v/%v: %v", keyspace, shard, err)
return
}
query := "select hostname, port, info from vitess_tablet where keyspace = ? and shard = ?"
args := sqlutils.Args(keyspace, shard)
refreshTablets(tablets, query, args, loader)
}

func refreshTablets(tablets map[string]*topo.TabletInfo, query string, args []interface{}, loader func(instanceKey *inst.InstanceKey)) {
// Discover new tablets.
// TODO(sougou): enhance this to work with multi-schema,
// where each instanceKey can have multiple tablets.
latestInstances := make(map[inst.InstanceKey]bool)
for _, tabletInfo := range tablets {
tablet := tabletInfo.Tablet
if tablet.MysqlHostname == "" {
Expand Down Expand Up @@ -132,8 +196,7 @@ func refreshTabletsInCell(ctx context.Context, cell string, loader func(instance

// Forget tablets that were removed.
toForget := make(map[inst.InstanceKey]*topodatapb.Tablet)
query := "select hostname, port, info from vitess_tablet where cell = ?"
err = db.QueryOrchestrator(query, sqlutils.Args(cell), func(row sqlutils.RowMap) error {
err := db.QueryOrchestrator(query, args, func(row sqlutils.RowMap) error {
curKey := inst.InstanceKey{
Hostname: row.GetString("hostname"),
Port: row.GetInt("port"),
Expand All @@ -152,7 +215,7 @@ func refreshTabletsInCell(ctx context.Context, cell string, loader func(instance
log.Errore(err)
}
for instanceKey, tablet := range toForget {
log.Infof("Forgeting: %v", tablet)
log.Infof("Forgetting: %v", tablet)
_, err := db.ExecOrchestrator(`
delete
from vitess_tablet
Expand Down
14 changes: 14 additions & 0 deletions go/vt/topotools/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,20 @@ func GetAllTablets(ctx context.Context, ts *topo.Server, cell string) ([]*topo.T
return tablets, nil
}

// GetTabletMapForCell returns a map of TabletInfo keyed by alias as string
func GetTabletMapForCell(ctx context.Context, ts *topo.Server, cell string) (map[string]*topo.TabletInfo, error) {
aliases, err := ts.GetTabletsByCell(ctx, cell)
if err != nil {
return nil, err
}
tabletMap, err := ts.GetTabletMap(ctx, aliases)
if err != nil {
// we got another error than topo.ErrNoNode
return nil, err
}
return tabletMap, nil
}

// GetAllTabletsAcrossCells returns all tablets from known cells.
// If it returns topo.ErrPartialResult, then the list is valid, but partial.
func GetAllTabletsAcrossCells(ctx context.Context, ts *topo.Server) ([]*topo.TabletInfo, error) {
Expand Down