Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
272 changes: 226 additions & 46 deletions go/test/endtoend/orchestrator/orc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,25 @@ func createCluster(t *testing.T, numReplicas int, numRdonly int) *cluster.LocalP
require.NoError(t, err)
}

for _, tablet := range shard0.Vttablets {
// Reset status, don't wait for the tablet status. We will check it later
tablet.VttabletProcess.ServingStatus = ""

// Start the tablet
err := tablet.VttabletProcess.Setup()
require.NoError(t, err)
}

for _, tablet := range shard0.Vttablets {
err := tablet.VttabletProcess.WaitForTabletTypes([]string{"SERVING", "NOT_SERVING"})
require.NoError(t, err)
}

// Start orchestrator
clusterInstance.OrcProcess = clusterInstance.NewOrcProcess(path.Join(os.Getenv("PWD"), "test_config.json"))
err = clusterInstance.OrcProcess.Setup()
require.NoError(t, err)

return clusterInstance
}

Expand All @@ -107,101 +126,243 @@ func createCluster(t *testing.T, numReplicas int, numRdonly int) *cluster.LocalP
func TestMasterElection(t *testing.T) {
defer cluster.PanicHandler(t)
clusterInstance := createCluster(t, 1, 1)
keyspace := &clusterInstance.Keyspaces[0]
shard0 := &keyspace.Shards[0]
defer func() {
clusterInstance.Teardown()
killTablets(t, shard0)
}()

//log.Exitf("error")
checkMasterTablet(t, clusterInstance, shard0.Vttablets[0])
checkReplication(t, clusterInstance, shard0.Vttablets[0], shard0.Vttablets[1:])
}

// 2. bring down master, let orc promote replica
func TestDownMaster(t *testing.T) {
defer cluster.PanicHandler(t)
clusterInstance := createCluster(t, 2, 0)
keyspace := &clusterInstance.Keyspaces[0]
shard0 := &keyspace.Shards[0]
for _, tablet := range shard0.Vttablets {
// Reset status, don't wait for the tablet status. We will check it later
tablet.VttabletProcess.ServingStatus = ""
defer func() {
clusterInstance.Teardown()
killTablets(t, shard0)
}()
// find master from topo
curMaster := shardMasterTablet(t, clusterInstance, keyspace, shard0)
assert.NotNil(t, curMaster, "should have elected a master")

// Start the tablet
err := tablet.VttabletProcess.Setup()
require.NoError(t, err)
// Make the current master database unavailable.
err := curMaster.MysqlctlProcess.Stop()
require.NoError(t, err)

for _, tablet := range shard0.Vttablets {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where's the Sleep() now?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is inside the call to checkMasterTablet

// we know we have only two tablets, so the "other" one must be the new master
if tablet.Alias != curMaster.Alias {
checkMasterTablet(t, clusterInstance, tablet)
break
}
}
}

// 3. make master readonly, let orc repair
func TestMasterReadOnly(t *testing.T) {
defer cluster.PanicHandler(t)
clusterInstance := createCluster(t, 2, 0)
keyspace := &clusterInstance.Keyspaces[0]
shard0 := &keyspace.Shards[0]
defer func() {
clusterInstance.Teardown()
// Kill tablets
killTablets(t, shard0)
}()

for _, tablet := range shard0.Vttablets {
err := tablet.VttabletProcess.WaitForTabletTypes([]string{"SERVING", "NOT_SERVING"})
require.NoError(t, err)
}
// find master from topo
curMaster := shardMasterTablet(t, clusterInstance, keyspace, shard0)
assert.NotNil(t, curMaster, "should have elected a master")

// Start orchestrator
clusterInstance.OrcProcess = clusterInstance.NewOrcProcess(path.Join(os.Getenv("PWD"), "test_config.json"))
err := clusterInstance.OrcProcess.Setup()
// TODO(deepthi): we should not need to do this, the DB should be created automatically
_, err := curMaster.VttabletProcess.QueryTablet(fmt.Sprintf("create database IF NOT EXISTS vt_%s", keyspace.Name), keyspace.Name, false)
require.NoError(t, err)

//log.Exitf("error")
checkMasterTablet(t, clusterInstance, shard0.Vttablets[0])

validateTopology(t, clusterInstance, true)

// create tables, insert data and make sure it is replicated correctly
sqlSchema := `
create table vt_insert_test (
id bigint,
msg varchar(64),
primary key (id)
) Engine=InnoDB
`
runSQL(t, sqlSchema, shard0.Vttablets[0])
confirmReplication(t, shard0.Vttablets[0], []*cluster.Vttablet{shard0.Vttablets[1]})
// Make the current master database read-only.
runSQL(t, "set global read_only=ON", curMaster)

// wait for repair
// TODO(deepthi): wait for condition instead of sleep
time.Sleep(15 * time.Second)
qr := runSQL(t, "select @@global.read_only", curMaster)
require.NotNil(t, qr)
require.Equal(t, 1, len(qr.Rows))
require.Equal(t, "[[INT64(0)]]", fmt.Sprintf("%s", qr.Rows), qr.Rows)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}

// 2. bring down master, let orc promote replica
func TestDownMaster(t *testing.T) {
// 4. make replica ReadWrite, let orc repair
func TestReplicaReadWrite(t *testing.T) {
defer cluster.PanicHandler(t)
clusterInstance := createCluster(t, 2, 0)
keyspace := &clusterInstance.Keyspaces[0]
shard0 := &keyspace.Shards[0]
defer func() {
clusterInstance.Teardown()
// Kill tablets
killTablets(t, shard0)
}()

// find master from topo
curMaster := shardMasterTablet(t, clusterInstance, keyspace, shard0)
assert.NotNil(t, curMaster, "should have elected a master")

// TODO(deepthi): we should not need to do this, the DB should be created automatically
_, err := curMaster.VttabletProcess.QueryTablet(fmt.Sprintf("create database IF NOT EXISTS vt_%s", keyspace.Name), keyspace.Name, false)
require.NoError(t, err)

var replica *cluster.Vttablet
for _, tablet := range shard0.Vttablets {
// we know we have only two tablets, so the "other" one must be the new master
if tablet.Alias != curMaster.Alias {
replica = tablet
break
}
}
// Make the replica database read-write.
runSQL(t, "set global read_only=OFF", replica)

// wait for repair
// TODO(deepthi): wait for condition instead of sleep
time.Sleep(15 * time.Second)
qr := runSQL(t, "select @@global.read_only", replica)
require.NotNil(t, qr)
require.Equal(t, 1, len(qr.Rows))
require.Equal(t, "[[INT64(1)]]", fmt.Sprintf("%s", qr.Rows), qr.Rows)
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


// 5. stop replication, let orc repair
func TestStopReplication(t *testing.T) {
defer cluster.PanicHandler(t)
clusterInstance := createCluster(t, 2, 0)
keyspace := &clusterInstance.Keyspaces[0]
shard0 := &keyspace.Shards[0]
for _, tablet := range shard0.Vttablets {
// Reset status, don't wait for the tablet status. We will check it later
tablet.VttabletProcess.ServingStatus = ""
defer func() {
clusterInstance.Teardown()
// Kill tablets
killTablets(t, shard0)
}()

// Start the tablet
err := tablet.VttabletProcess.Setup()
require.NoError(t, err)
// find master from topo
curMaster := shardMasterTablet(t, clusterInstance, keyspace, shard0)
assert.NotNil(t, curMaster, "should have elected a master")

// TODO(deepthi): we should not need to do this, the DB should be created automatically
_, err := curMaster.VttabletProcess.QueryTablet(fmt.Sprintf("create database IF NOT EXISTS vt_%s", keyspace.Name), keyspace.Name, false)
require.NoError(t, err)

var replica *cluster.Vttablet
for _, tablet := range shard0.Vttablets {
// we know we have only two tablets, so the "other" one must be the new master
if tablet.Alias != curMaster.Alias {
replica = tablet
break
}
}
require.NotNil(t, replica, "should be able to find a replica")
// use vtctlclient to stop replication
_, err = clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("StopReplication", replica.Alias)
require.NoError(t, err)

// wait for repair
time.Sleep(15 * time.Second)
// check replication is setup correctly
checkReplication(t, clusterInstance, curMaster, []*cluster.Vttablet{replica})
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like these tests share a lot of code in common, the code should probably be refactored to provide a framework for setup-destroy-wait-expect cycle.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved as much as possible into createCluster and checkReplication. What is left is slightly different in each test case.
As we add more tests we can revisit and refactor.

}

// 6. setup replication from non-master, let orc repair
func TestReplicationFromOtherReplica(t *testing.T) {
defer cluster.PanicHandler(t)
clusterInstance := createCluster(t, 3, 0)
keyspace := &clusterInstance.Keyspaces[0]
shard0 := &keyspace.Shards[0]
defer func() {
clusterInstance.Teardown()
// Kill tablets
killTablets(t, shard0)
}()

// find master from topo
curMaster := shardMasterTablet(t, clusterInstance, keyspace, shard0)
assert.NotNil(t, curMaster, "should have elected a master")

// TODO(deepthi): we should not need to do this, the DB should be created automatically
_, err := curMaster.VttabletProcess.QueryTablet(fmt.Sprintf("create database IF NOT EXISTS vt_%s", keyspace.Name), keyspace.Name, false)
require.NoError(t, err)

var replica, otherReplica *cluster.Vttablet
for _, tablet := range shard0.Vttablets {
err := tablet.VttabletProcess.WaitForTabletTypes([]string{"SERVING", "NOT_SERVING"})
require.NoError(t, err)
// we know we have only two tablets, so the "other" one must be the new master
if tablet.Alias != curMaster.Alias {
if replica == nil {
replica = tablet
} else {
otherReplica = tablet
}
}
}
require.NotNil(t, replica, "should be able to find a replica")
require.NotNil(t, otherReplica, "should be able to find 2nd replica")

// Start orchestrator
clusterInstance.OrcProcess = clusterInstance.NewOrcProcess(path.Join(os.Getenv("PWD"), "test_config.json"))
err := clusterInstance.OrcProcess.Setup()
require.NoError(t, err)
// point replica at otherReplica
// Get master position
hostname := "localhost"
_, gtid := cluster.GetMasterPosition(t, *otherReplica, hostname)

changeMasterCommand := fmt.Sprintf("STOP SLAVE; RESET MASTER; SET GLOBAL gtid_purged = '%s';"+
"CHANGE MASTER TO MASTER_HOST='%s', MASTER_PORT=%d, MASTER_USER='vt_repl', MASTER_AUTO_POSITION = 1; START SLAVE", gtid, hostname, otherReplica.MySQLPort)
result, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDba", replica.Alias, changeMasterCommand)
require.NoError(t, err, result)

// wait for repair
time.Sleep(15 * time.Second)
// check replication is setup correctly
checkReplication(t, clusterInstance, curMaster, []*cluster.Vttablet{replica, otherReplica})
}

func TestRepairAfterTER(t *testing.T) {
defer cluster.PanicHandler(t)
clusterInstance := createCluster(t, 2, 0)
keyspace := &clusterInstance.Keyspaces[0]
shard0 := &keyspace.Shards[0]
defer func() {
clusterInstance.Teardown()
// Kill tablets
killTablets(t, shard0)
}()

// find master from topo
curMaster := shardMasterTablet(t, clusterInstance, keyspace, shard0)
assert.NotNil(t, curMaster, "should have elected a master")

// Make the current master database unavailable.
err = curMaster.MysqlctlProcess.Stop()
// TODO(deepthi): we should not need to do this, the DB should be created automatically
_, err := curMaster.VttabletProcess.QueryTablet(fmt.Sprintf("create database IF NOT EXISTS vt_%s", keyspace.Name), keyspace.Name, false)
require.NoError(t, err)

var newMaster *cluster.Vttablet
for _, tablet := range shard0.Vttablets {
// we know we have only two tablets, so the "other" one must be the new master
if tablet.Alias != curMaster.Alias {
checkMasterTablet(t, clusterInstance, tablet)
newMaster = tablet
break
}
}

// TER to other tablet
_, err = clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("TabletExternallyReparented", newMaster.Alias)
require.NoError(t, err)

// wait for repair
// TODO(deepthi): wait for condition instead of sleep
time.Sleep(15 * time.Second)

checkReplication(t, clusterInstance, newMaster, []*cluster.Vttablet{curMaster})
}

func shardMasterTablet(t *testing.T, cluster *cluster.LocalProcessCluster, keyspace *cluster.Keyspace, shard *cluster.Shard) *cluster.Vttablet {
Expand Down Expand Up @@ -236,6 +397,7 @@ func checkMasterTablet(t *testing.T, cluster *cluster.LocalProcessCluster, table
for {
now := time.Now()
if now.Sub(start) > time.Second*60 {
//log.Exitf("error")
assert.FailNow(t, "failed to elect master before timeout")
}
result, err := cluster.VtctlclientProcess.ExecuteCommandWithOutput("GetTablet", tablet.Alias)
Expand All @@ -258,7 +420,9 @@ func checkMasterTablet(t *testing.T, cluster *cluster.LocalProcessCluster, table

err = json2.Unmarshal([]byte(result), &streamHealthResponse)
require.NoError(t, err)

//if !streamHealthResponse.GetServing() {
// log.Exitf("stream health not updated")
//}
assert.True(t, streamHealthResponse.GetServing(), "stream health: %v", streamHealthResponse)
tabletType := streamHealthResponse.GetTarget().GetTabletType()
require.Equal(t, topodatapb.TabletType_MASTER, tabletType)
Expand All @@ -267,7 +431,23 @@ func checkMasterTablet(t *testing.T, cluster *cluster.LocalProcessCluster, table
}
}

func checkReplication(t *testing.T, clusterInstance *cluster.LocalProcessCluster, master *cluster.Vttablet, replicas []*cluster.Vttablet) {
validateTopology(t, clusterInstance, true)

// create tables, insert data and make sure it is replicated correctly
sqlSchema := `
create table vt_insert_test (
id bigint,
msg varchar(64),
primary key (id)
) Engine=InnoDB
`
runSQL(t, sqlSchema, master)
confirmReplication(t, master, replicas)
}

func confirmReplication(t *testing.T, master *cluster.Vttablet, replicas []*cluster.Vttablet) {
log.Infof("Insert data into master and check that it is replicated to replica")
n := 2 // random value ...
// insert data into the new master, check the connected replica work
insertSQL := fmt.Sprintf("insert into vt_insert_test(id, msg) values (%d, 'test %d')", n, n)
Expand Down