Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions go/test/endtoend/cluster/cluster_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ type LocalProcessCluster struct {
VtgateProcess VtgateProcess
VtworkerProcess VtworkerProcess
VtbackupProcess VtbackupProcess
VtorcProcess *VtorcProcess
VtorcProcesses []*VtorcProcess

nextPortForProcess int

Expand Down Expand Up @@ -726,8 +726,8 @@ func (cluster *LocalProcessCluster) Teardown() {
log.Errorf("Error in vtgate teardown: %v", err)
}

if cluster.VtorcProcess != nil {
if err := cluster.VtorcProcess.TearDown(); err != nil {
for _, vtorcProcess := range cluster.VtorcProcesses {
if err := vtorcProcess.TearDown(); err != nil {
log.Errorf("Error in vtorc teardown: %v", err)
}
}
Expand Down Expand Up @@ -952,13 +952,14 @@ func (cluster *LocalProcessCluster) NewVttabletInstance(tabletType string, UID i
}

// NewOrcProcess creates a new VtorcProcess object
func (cluster *LocalProcessCluster) NewOrcProcess(configFile string) *VtorcProcess {
func (cluster *LocalProcessCluster) NewOrcProcess(config VtorcConfiguration) *VtorcProcess {
base := VtctlProcessInstance(cluster.TopoProcess.Port, cluster.Hostname)
base.Binary = "vtorc"
return &VtorcProcess{
VtctlProcess: *base,
LogDir: cluster.TmpDirectory,
Config: configFile,
Config: config,
WebPort: cluster.GetAndReservePort(),
}
}

Expand Down
67 changes: 60 additions & 7 deletions go/test/endtoend/cluster/vtorc_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ limitations under the License.
package cluster

import (
"encoding/json"
"fmt"
"os"
"os/exec"
Expand All @@ -33,16 +34,68 @@ import (
// vtorc as a separate process for testing
type VtorcProcess struct {
VtctlProcess
LogDir string
ExtraArgs []string
Config string
proc *exec.Cmd
exit chan error
LogDir string
ExtraArgs []string
ConfigPath string
Config VtorcConfiguration
WebPort int
proc *exec.Cmd
exit chan error
}

type VtorcConfiguration struct {
Debug bool
ListenAddress string
MySQLTopologyUser string
MySQLTopologyPassword string
MySQLReplicaUser string
MySQLReplicaPassword string
RecoveryPeriodBlockSeconds int
InstancePollSeconds int
PreventCrossDataCenterPrimaryFailover bool `json:",omitempty"`
LockShardTimeoutSeconds int `json:",omitempty"`
Durability string `json:",omitempty"`
ReplicationLagQuery string `json:",omitempty"`
FailPrimaryPromotionOnLagMinutes int `json:",omitempty"`
}

// ToJSONString will marshal this configuration as JSON
func (config *VtorcConfiguration) ToJSONString() string {
b, _ := json.MarshalIndent(config, "", "\t")
return string(b)
}

func (config *VtorcConfiguration) AddDefaults(webPort int) {
config.Debug = true
config.MySQLTopologyUser = "orc_client_user"
config.MySQLTopologyPassword = "orc_client_user_password"
config.MySQLReplicaUser = "vt_repl"
config.MySQLReplicaPassword = ""
config.RecoveryPeriodBlockSeconds = 1
config.InstancePollSeconds = 1
config.ListenAddress = fmt.Sprintf(":%d", webPort)
}

// Setup starts orc process with required arguements
func (orc *VtorcProcess) Setup() (err error) {

// create the configuration file
timeNow := time.Now().UnixNano()
configFile, _ := os.Create(path.Join(orc.LogDir, fmt.Sprintf("orc-config-%d.json", timeNow)))
orc.ConfigPath = configFile.Name()

// Add the default configurations and print them out
orc.Config.AddDefaults(orc.WebPort)
log.Errorf("configuration - %v", orc.Config.ToJSONString())
_, err = configFile.WriteString(orc.Config.ToJSONString())
if err != nil {
return err
}
err = configFile.Close()
if err != nil {
return err
}

/* minimal command line arguments:
$ vtorc -topo_implementation etcd2 -topo_global_server_address localhost:2379 -topo_global_root /vitess/global
-config config/orchestrator/default.json -alsologtostderr http
Expand All @@ -52,7 +105,7 @@ func (orc *VtorcProcess) Setup() (err error) {
"--topo_implementation", orc.TopoImplementation,
"--topo_global_server_address", orc.TopoGlobalAddress,
"--topo_global_root", orc.TopoGlobalRoot,
"--config", orc.Config,
"--config", orc.ConfigPath,
"--orc_web_dir", path.Join(os.Getenv("VTROOT"), "web", "orchestrator"),
)
if *isCoverage {
Expand All @@ -62,7 +115,7 @@ func (orc *VtorcProcess) Setup() (err error) {
orc.proc.Args = append(orc.proc.Args, orc.ExtraArgs...)
orc.proc.Args = append(orc.proc.Args, "--alsologtostderr", "http")

errFile, _ := os.Create(path.Join(orc.LogDir, fmt.Sprintf("orc-stderr-%d.txt", time.Now().UnixNano())))
errFile, _ := os.Create(path.Join(orc.LogDir, fmt.Sprintf("orc-stderr-%d.txt", timeNow)))
orc.proc.Stderr = errFile

orc.proc.Env = append(orc.proc.Env, os.Environ()...)
Expand Down
4 changes: 2 additions & 2 deletions go/test/endtoend/vtorc/general/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ func TestMain(m *testing.M) {
if clusterInfo != nil {
// stop vtorc first otherwise its logs get polluted
// with instances being unreachable triggering unnecessary operations
if clusterInfo.ClusterInstance.VtorcProcess != nil {
_ = clusterInfo.ClusterInstance.VtorcProcess.TearDown()
for _, vtorcProcess := range clusterInfo.ClusterInstance.VtorcProcesses {
_ = vtorcProcess.TearDown()
}

for _, cellInfo := range clusterInfo.CellInfos {
Expand Down
103 changes: 88 additions & 15 deletions go/test/endtoend/vtorc/general/vtorc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,31 @@ import (
)

// Cases to test:
// 1. create cluster with 1 replica and 1 rdonly, let orc choose primary
// 1. create cluster with 2 replicas and 1 rdonly, let orc choose primary
// verify rdonly is not elected, only replica
// verify replication is setup
// verify that with multiple vtorc instances, we still only have 1 PlannedReparentShard call
func TestPrimaryElection(t *testing.T) {
defer cluster.PanicHandler(t)
utils.SetupVttabletsAndVtorc(t, clusterInfo, 1, 1, nil, "test_config.json")
utils.SetupVttabletsAndVtorc(t, clusterInfo, 2, 1, nil, cluster.VtorcConfiguration{
PreventCrossDataCenterPrimaryFailover: true,
}, 2)
keyspace := &clusterInfo.ClusterInstance.Keyspaces[0]
shard0 := &keyspace.Shards[0]

utils.CheckPrimaryTablet(t, clusterInfo, shard0.Vttablets[0], true)
utils.CheckReplication(t, clusterInfo, shard0.Vttablets[0], shard0.Vttablets[1:], 10*time.Second)
primary := utils.ShardPrimaryTablet(t, clusterInfo, keyspace, shard0)
assert.NotNil(t, primary, "should have elected a primary")
utils.CheckReplication(t, clusterInfo, primary, shard0.Vttablets, 10*time.Second)

for _, vttablet := range shard0.Vttablets {
if vttablet.Type == "rdonly" && primary.Alias == vttablet.Alias {
t.Errorf("Rdonly tablet promoted as primary - %v", primary.Alias)
}
}

res, err := utils.RunSQL(t, "select * from reparent_journal", primary, "_vt")
require.NoError(t, err)
require.Len(t, res.Rows, 1, "There should only be 1 primary tablet which was elected")
}

// Cases to test:
Expand All @@ -51,7 +65,9 @@ func TestPrimaryElection(t *testing.T) {
// verify replication is setup
func TestSingleKeyspace(t *testing.T) {
defer cluster.PanicHandler(t)
utils.SetupVttabletsAndVtorc(t, clusterInfo, 1, 1, []string{"--clusters_to_watch", "ks"}, "test_config.json")
utils.SetupVttabletsAndVtorc(t, clusterInfo, 1, 1, []string{"--clusters_to_watch", "ks"}, cluster.VtorcConfiguration{
PreventCrossDataCenterPrimaryFailover: true,
}, 1)
keyspace := &clusterInfo.ClusterInstance.Keyspaces[0]
shard0 := &keyspace.Shards[0]

Expand All @@ -65,7 +81,9 @@ func TestSingleKeyspace(t *testing.T) {
// verify replication is setup
func TestKeyspaceShard(t *testing.T) {
defer cluster.PanicHandler(t)
utils.SetupVttabletsAndVtorc(t, clusterInfo, 1, 1, []string{"--clusters_to_watch", "ks/0"}, "test_config.json")
utils.SetupVttabletsAndVtorc(t, clusterInfo, 1, 1, []string{"--clusters_to_watch", "ks/0"}, cluster.VtorcConfiguration{
PreventCrossDataCenterPrimaryFailover: true,
}, 1)
keyspace := &clusterInfo.ClusterInstance.Keyspaces[0]
shard0 := &keyspace.Shards[0]

Expand Down Expand Up @@ -95,7 +113,9 @@ func waitForReadOnlyValue(t *testing.T, curPrimary *cluster.Vttablet, expectValu
// 3. make primary readonly, let orc repair
func TestPrimaryReadOnly(t *testing.T) {
defer cluster.PanicHandler(t)
utils.SetupVttabletsAndVtorc(t, clusterInfo, 2, 0, nil, "test_config.json")
utils.SetupVttabletsAndVtorc(t, clusterInfo, 2, 0, nil, cluster.VtorcConfiguration{
PreventCrossDataCenterPrimaryFailover: true,
}, 1)
keyspace := &clusterInfo.ClusterInstance.Keyspaces[0]
shard0 := &keyspace.Shards[0]

Expand All @@ -115,7 +135,9 @@ func TestPrimaryReadOnly(t *testing.T) {
// 4. make replica ReadWrite, let orc repair
func TestReplicaReadWrite(t *testing.T) {
defer cluster.PanicHandler(t)
utils.SetupVttabletsAndVtorc(t, clusterInfo, 2, 0, nil, "test_config.json")
utils.SetupVttabletsAndVtorc(t, clusterInfo, 2, 0, nil, cluster.VtorcConfiguration{
PreventCrossDataCenterPrimaryFailover: true,
}, 1)
keyspace := &clusterInfo.ClusterInstance.Keyspaces[0]
shard0 := &keyspace.Shards[0]

Expand Down Expand Up @@ -143,7 +165,9 @@ func TestReplicaReadWrite(t *testing.T) {
// 5. stop replication, let orc repair
func TestStopReplication(t *testing.T) {
defer cluster.PanicHandler(t)
utils.SetupVttabletsAndVtorc(t, clusterInfo, 2, 0, nil, "test_config.json")
utils.SetupVttabletsAndVtorc(t, clusterInfo, 2, 0, nil, cluster.VtorcConfiguration{
PreventCrossDataCenterPrimaryFailover: true,
}, 1)
keyspace := &clusterInfo.ClusterInstance.Keyspaces[0]
shard0 := &keyspace.Shards[0]

Expand Down Expand Up @@ -175,7 +199,9 @@ func TestStopReplication(t *testing.T) {
// 6. setup replication from non-primary, let orc repair
func TestReplicationFromOtherReplica(t *testing.T) {
defer cluster.PanicHandler(t)
utils.SetupVttabletsAndVtorc(t, clusterInfo, 3, 0, nil, "test_config.json")
utils.SetupVttabletsAndVtorc(t, clusterInfo, 3, 0, nil, cluster.VtorcConfiguration{
PreventCrossDataCenterPrimaryFailover: true,
}, 1)
keyspace := &clusterInfo.ClusterInstance.Keyspaces[0]
shard0 := &keyspace.Shards[0]

Expand Down Expand Up @@ -221,7 +247,9 @@ func TestRepairAfterTER(t *testing.T) {
// test fails intermittently on CI, skip until it can be fixed.
t.SkipNow()
defer cluster.PanicHandler(t)
utils.SetupVttabletsAndVtorc(t, clusterInfo, 2, 0, nil, "test_config.json")
utils.SetupVttabletsAndVtorc(t, clusterInfo, 2, 0, nil, cluster.VtorcConfiguration{
PreventCrossDataCenterPrimaryFailover: true,
}, 1)
keyspace := &clusterInfo.ClusterInstance.Keyspaces[0]
shard0 := &keyspace.Shards[0]

Expand Down Expand Up @@ -252,7 +280,9 @@ func TestRepairAfterTER(t *testing.T) {
// 7. make instance A replicates from B and B from A, wait for repair
func TestCircularReplication(t *testing.T) {
defer cluster.PanicHandler(t)
utils.SetupVttabletsAndVtorc(t, clusterInfo, 2, 0, nil, "test_config.json")
utils.SetupVttabletsAndVtorc(t, clusterInfo, 2, 0, nil, cluster.VtorcConfiguration{
PreventCrossDataCenterPrimaryFailover: true,
}, 1)
keyspace := &clusterInfo.ClusterInstance.Keyspaces[0]
shard0 := &keyspace.Shards[0]

Expand Down Expand Up @@ -292,11 +322,14 @@ func TestCircularReplication(t *testing.T) {
// TestSemiSync tests that semi-sync is setup correctly by vtorc if it is incorrectly set
func TestSemiSync(t *testing.T) {
// stop any vtorc instance running due to a previous test.
utils.StopVtorc(t, clusterInfo)
utils.StopVtorcs(t, clusterInfo)
newCluster := utils.SetupNewClusterSemiSync(t)
utils.StartVtorc(t, newCluster, nil, "test_config_semi_sync.json")
utils.StartVtorcs(t, newCluster, nil, cluster.VtorcConfiguration{
PreventCrossDataCenterPrimaryFailover: true,
Durability: "semi_sync",
}, 1)
defer func() {
utils.StopVtorc(t, newCluster)
utils.StopVtorcs(t, newCluster)
newCluster.ClusterInstance.Teardown()
}()
keyspace := &newCluster.ClusterInstance.Keyspaces[0]
Expand Down Expand Up @@ -353,3 +386,43 @@ func TestSemiSync(t *testing.T) {
}
}
}

// TestVtorcWithPrs tests that VTOrc works fine even when PRS is called from vtctld
func TestVtorcWithPrs(t *testing.T) {
defer cluster.PanicHandler(t)
utils.SetupVttabletsAndVtorc(t, clusterInfo, 4, 0, nil, cluster.VtorcConfiguration{
PreventCrossDataCenterPrimaryFailover: true,
}, 1)
keyspace := &clusterInfo.ClusterInstance.Keyspaces[0]
shard0 := &keyspace.Shards[0]

// find primary from topo
curPrimary := utils.ShardPrimaryTablet(t, clusterInfo, keyspace, shard0)
assert.NotNil(t, curPrimary, "should have elected a primary")

// find any replica tablet other than the current primary
var replica *cluster.Vttablet
for _, tablet := range shard0.Vttablets {
if tablet.Alias != curPrimary.Alias {
replica = tablet
break
}
}
assert.NotNil(t, replica, "could not find any replica tablet")

// check that the replication is setup correctly before we failover
utils.CheckReplication(t, clusterInfo, curPrimary, shard0.Vttablets, 10*time.Second)

output, err := clusterInfo.ClusterInstance.VtctlclientProcess.ExecuteCommandWithOutput(
"PlannedReparentShard",
"-keyspace_shard", fmt.Sprintf("%s/%s", keyspace.Name, shard0.Name),
"-wait_replicas_timeout", "31s",
"-new_primary", replica.Alias)
require.NoError(t, err, "error in PlannedReparentShard output - %s", output)

time.Sleep(40 * time.Second)

// check that the replica gets promoted
utils.CheckPrimaryTablet(t, clusterInfo, replica, true)
utils.VerifyWritesSucceed(t, clusterInfo, replica, shard0.Vttablets, 10*time.Second)
}
Loading