Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 102 additions & 82 deletions go/test/endtoend/vreplication/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ import (
"os"
"os/exec"
"strings"
_ "strings"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/stretchr/testify/assert"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
Expand Down Expand Up @@ -91,7 +92,7 @@ func NewVitessCluster(name string) (cluster *VitessCluster, err error) {
}

// InitCluster creates the global processes needed for a cluster
func InitCluster(t *testing.T, cellName string) *VitessCluster {
func InitCluster(t *testing.T, cellNames []string) *VitessCluster {
initGlobals()
vc, _ := NewVitessCluster("Vdemo")
assert.NotNil(t, vc)
Expand All @@ -101,20 +102,25 @@ func InitCluster(t *testing.T, cellName string) *VitessCluster {
assert.Nil(t, topo.Setup("etcd2", nil))
topo.ManageTopoDir("mkdir", "/vitess/global")
vc.Topo = topo
topo.ManageTopoDir("mkdir", "/vitess/"+cellName)
for _, cellName := range cellNames {
topo.ManageTopoDir("mkdir", "/vitess/"+cellName)
}

vtctld := cluster.VtctldProcessInstance(globalConfig.vtctldPort, globalConfig.vtctldGrpcPort,
globalConfig.topoPort, globalConfig.hostname, globalConfig.tmpDir)
vc.Vtctld = vtctld
assert.NotNil(t, vc.Vtctld)
vc.Vtctld.Setup(cellName)
// use first cell as `-cell` and all cells as `-cells_to_watch`
vc.Vtctld.Setup(cellNames[0], "-cells_to_watch", strings.Join(cellNames, ","))

vc.Vtctl = cluster.VtctlProcessInstance(globalConfig.topoPort, globalConfig.hostname)
assert.NotNil(t, vc.Vtctl)
vc.Vtctl.AddCellInfo(cellName)
cell, err := vc.AddCell(t, cellName)
assert.Nil(t, err)
assert.NotNil(t, cell)
for _, cellName := range cellNames {
vc.Vtctl.AddCellInfo(cellName)
cell, err := vc.AddCell(t, cellName)
assert.Nil(t, err)
assert.NotNil(t, cell)
}

vc.VtctlClient = cluster.VtctlClientProcessInstance(globalConfig.hostname, vc.Vtctld.GrpcPort, globalConfig.tmpDir)
assert.NotNil(t, vc.VtctlClient)
Expand All @@ -123,7 +129,7 @@ func InitCluster(t *testing.T, cellName string) *VitessCluster {
}

// AddKeyspace creates a keyspace with specified shard keys and number of replica/read-only tablets
func (vc *VitessCluster) AddKeyspace(t *testing.T, cell *Cell, ksName string, shards string, vschema string, schema string, numReplicas int, numRdonly int, tabletIDBase int) (*Keyspace, error) {
func (vc *VitessCluster) AddKeyspace(t *testing.T, cells []*Cell, ksName string, shards string, vschema string, schema string, numReplicas int, numRdonly int, tabletIDBase int) (*Keyspace, error) {
keyspace := &Keyspace{
Name: ksName,
Shards: make(map[string]*Shard),
Expand All @@ -132,10 +138,16 @@ func (vc *VitessCluster) AddKeyspace(t *testing.T, cell *Cell, ksName string, sh
if err := vc.Vtctl.CreateKeyspace(keyspace.Name); err != nil {
t.Fatalf(err.Error())
}
cell.Keyspaces[ksName] = keyspace
if err := vc.AddShards(t, cell, keyspace, shards, numReplicas, numRdonly, tabletIDBase); err != nil {
t.Fatalf(err.Error())
cellsToWatch := ""
for i, cell := range cells {
if i > 0 {
cellsToWatch = cellsToWatch + ","
}
cell.Keyspaces[ksName] = keyspace
cellsToWatch = cellsToWatch + cell.Name
}
require.NoError(t, vc.AddShards(t, cells, keyspace, shards, numReplicas, numRdonly, tabletIDBase))

if schema != "" {
if err := vc.VtctlClient.ApplySchema(ksName, schema); err != nil {
t.Fatalf(err.Error())
Expand All @@ -148,9 +160,11 @@ func (vc *VitessCluster) AddKeyspace(t *testing.T, cell *Cell, ksName string, sh
}
}
keyspace.VSchema = vschema
if len(cell.Vtgates) == 0 {
fmt.Println("Starting vtgate")
vc.StartVtgate(t, cell)
for _, cell := range cells {
if len(cell.Vtgates) == 0 {
fmt.Println("Starting vtgate")
vc.StartVtgate(t, cell, cellsToWatch)
}
}
_ = vc.VtctlClient.ExecuteCommand("RebuildKeyspaceGraph", ksName)
return keyspace, nil
Expand Down Expand Up @@ -194,87 +208,92 @@ func (vc *VitessCluster) AddTablet(t *testing.T, cell *Cell, keyspace *Keyspace,
}

// AddShards creates shards given list of comma-separated keys with specified tablets in each shard
func (vc *VitessCluster) AddShards(t *testing.T, cell *Cell, keyspace *Keyspace, names string, numReplicas int, numRdonly int, tabletIDBase int) error {
func (vc *VitessCluster) AddShards(t *testing.T, cells []*Cell, keyspace *Keyspace, names string, numReplicas int, numRdonly int, tabletIDBase int) error {
arrNames := strings.Split(names, ",")
fmt.Printf("Addshards got %d shards with %+v\n", len(arrNames), arrNames)
isSharded := len(arrNames) > 1
masterTabletUID := 0
for ind, shardName := range arrNames {
if _, ok := keyspace.Shards[shardName]; ok {
fmt.Printf("Shard %s already exists, not adding\n", shardName)
continue
}
tabletID := tabletIDBase + ind*100
tabletIndex := 0
dbProcesses := make([]*exec.Cmd, 0)
tablets := make([]*Tablet, 0)

fmt.Printf("Adding Shard %s\n", shardName)
if err := vc.VtctlClient.ExecuteCommand("CreateShard", keyspace.Name+"/"+shardName); err != nil {
t.Fatalf("CreateShard command failed with %+v\n", err)
}

shard := &Shard{Name: shardName, IsSharded: isSharded, Tablets: make(map[string]*Tablet, 1)}
fmt.Println("Adding Master tablet")
master, proc, err := vc.AddTablet(t, cell, keyspace, shard, "replica", tabletID+tabletIndex)
if err != nil {
t.Fatalf(err.Error())
}
assert.NotNil(t, master)
tabletIndex++
master.Vttablet.VreplicationTabletType = "MASTER"
tablets = append(tablets, master)
dbProcesses = append(dbProcesses, proc)
for i := 0; i < numReplicas; i++ {
fmt.Println("Adding Replica tablet")
tablet, proc, err := vc.AddTablet(t, cell, keyspace, shard, "replica", tabletID+tabletIndex)
if err != nil {
t.Fatalf(err.Error())
if _, ok := keyspace.Shards[shardName]; ok {
fmt.Printf("Shard %s already exists, not adding\n", shardName)
} else {
fmt.Printf("Adding Shard %s\n", shardName)
if err := vc.VtctlClient.ExecuteCommand("CreateShard", keyspace.Name+"/"+shardName); err != nil {
t.Fatalf("CreateShard command failed with %+v\n", err)
}
assert.NotNil(t, tablet)
tabletIndex++
tablets = append(tablets, tablet)
dbProcesses = append(dbProcesses, proc)
keyspace.Shards[shardName] = shard
}
for i := 0; i < numRdonly; i++ {
fmt.Println("Adding RdOnly tablet")
tablet, proc, err := vc.AddTablet(t, cell, keyspace, shard, "rdonly", tabletID+tabletIndex)
if err != nil {
t.Fatalf(err.Error())
for i, cell := range cells {
dbProcesses := make([]*exec.Cmd, 0)
tablets := make([]*Tablet, 0)
if i == 0 {
// only add master tablet for first cell, so first time CreateShard is called
fmt.Println("Adding Master tablet")
master, proc, err := vc.AddTablet(t, cell, keyspace, shard, "replica", tabletID+tabletIndex)
if err != nil {
t.Fatalf(err.Error())
}
assert.NotNil(t, master)
tabletIndex++
master.Vttablet.VreplicationTabletType = "MASTER"
tablets = append(tablets, master)
dbProcesses = append(dbProcesses, proc)
masterTabletUID = master.Vttablet.TabletUID
}
assert.NotNil(t, tablet)
tabletIndex++
tablets = append(tablets, tablet)
dbProcesses = append(dbProcesses, proc)
}

keyspace.Shards[shardName] = shard
for ind, proc := range dbProcesses {
fmt.Printf("Waiting for mysql process for tablet %s\n", tablets[ind].Name)
if err := proc.Wait(); err != nil {
t.Fatalf("%v :: Unable to start mysql server for %v", err, tablets[ind].Vttablet)
for i := 0; i < numReplicas; i++ {
fmt.Println("Adding Replica tablet")
tablet, proc, err := vc.AddTablet(t, cell, keyspace, shard, "replica", tabletID+tabletIndex)
if err != nil {
t.Fatalf(err.Error())
}
assert.NotNil(t, tablet)
tabletIndex++
tablets = append(tablets, tablet)
dbProcesses = append(dbProcesses, proc)
}
}
for ind, tablet := range tablets {
fmt.Printf("Creating vt_keyspace database for tablet %s\n", tablets[ind].Name)
if _, err := tablet.Vttablet.QueryTablet(fmt.Sprintf("create database vt_%s", keyspace.Name),
keyspace.Name, false); err != nil {
t.Fatalf("Unable to start create database vt_%s for tablet %v", keyspace.Name, tablet.Vttablet)
for i := 0; i < numRdonly; i++ {
fmt.Println("Adding RdOnly tablet")
tablet, proc, err := vc.AddTablet(t, cell, keyspace, shard, "rdonly", tabletID+tabletIndex)
if err != nil {
t.Fatalf(err.Error())
}
assert.NotNil(t, tablet)
tabletIndex++
tablets = append(tablets, tablet)
dbProcesses = append(dbProcesses, proc)
}
fmt.Printf("Running Setup() for vttablet %s\n", tablets[ind].Name)
if err := tablet.Vttablet.Setup(); err != nil {
t.Fatalf(err.Error())

for ind, proc := range dbProcesses {
fmt.Printf("Waiting for mysql process for tablet %s\n", tablets[ind].Name)
if err := proc.Wait(); err != nil {
t.Fatalf("%v :: Unable to start mysql server for %v", err, tablets[ind].Vttablet)
}
}
for ind, tablet := range tablets {
fmt.Printf("Creating vt_keyspace database for tablet %s\n", tablets[ind].Name)
if _, err := tablet.Vttablet.QueryTablet(fmt.Sprintf("create database vt_%s", keyspace.Name),
keyspace.Name, false); err != nil {
t.Fatalf("Unable to start create database vt_%s for tablet %v", keyspace.Name, tablet.Vttablet)
}
fmt.Printf("Running Setup() for vttablet %s\n", tablets[ind].Name)
if err := tablet.Vttablet.Setup(); err != nil {
t.Fatalf(err.Error())
}
}
}
fmt.Printf("InitShardMaster for %d\n", master.Vttablet.TabletUID)
err = vc.VtctlClient.InitShardMaster(keyspace.Name, shardName, cell.Name, master.Vttablet.TabletUID)
if err != nil {
t.Fatal(err.Error())
}
require.NotEqual(t, 0, masterTabletUID, "Should have created a master tablet")
fmt.Printf("InitShardMaster for %d\n", masterTabletUID)
require.NoError(t, vc.VtctlClient.InitShardMaster(keyspace.Name, shardName, cells[0].Name, masterTabletUID))
fmt.Printf("Finished creating shard %s\n", shard.Name)
}
return nil
}

// DeleteShard deletes a shard
func (vc *VitessCluster) DeleteShard(t *testing.T, cellName string, ksName string, shardName string) {
shard := vc.Cells[cellName].Keyspaces[ksName].Shards[shardName]
assert.NotNil(t, shard)
Expand All @@ -291,13 +310,13 @@ func (vc *VitessCluster) DeleteShard(t *testing.T, cellName string, ksName strin
}

// StartVtgate starts a vtgate process
func (vc *VitessCluster) StartVtgate(t *testing.T, cell *Cell) {
func (vc *VitessCluster) StartVtgate(t *testing.T, cell *Cell, cellsToWatch string) {
vtgate := cluster.VtgateProcessInstance(
globalConfig.vtgatePort,
globalConfig.vtgateGrpcPort,
globalConfig.vtgateMySQLPort,
cell.Name,
cell.Name,
cellsToWatch,
globalConfig.hostname,
globalConfig.tabletTypes,
globalConfig.topoPort,
Expand Down Expand Up @@ -405,12 +424,13 @@ func (vc *VitessCluster) execTabletQuery(vttablet *cluster.VttabletProcess, quer
Uname: "vt_dba",
}
ctx := context.Background()
if conn, err := mysql.Connect(ctx, &vtParams); err != nil {
var conn *mysql.Conn
conn, err := mysql.Connect(ctx, &vtParams)
if err != nil {
return nil, err
} else {
qr, err := conn.ExecuteFetch(query, 1000, true)
return qr, err
}
qr, err := conn.ExecuteFetch(query, 1000, true)
return qr, err
}

func (vc *VitessCluster) getVttabletsInKeyspace(t *testing.T, cell *Cell, ksName string, tabletType string) map[string]*cluster.VttabletProcess {
Expand Down
Loading