Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go/test/endtoend/cluster/cluster_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -1060,6 +1060,7 @@ func (cluster *LocalProcessCluster) NewVTOrcProcess(config VTOrcConfiguration) *
LogDir: cluster.TmpDirectory,
Config: config,
WebPort: cluster.GetAndReservePort(),
Port: cluster.GetAndReservePort(),
}
}

Expand Down
25 changes: 25 additions & 0 deletions go/test/endtoend/cluster/vtorc_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package cluster
import (
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"os/exec"
"path"
Expand All @@ -34,6 +36,7 @@ import (
// vtorc as a separate process for testing
type VTOrcProcess struct {
VtctlProcess
Port int
LogDir string
ExtraArgs []string
ConfigPath string
Expand Down Expand Up @@ -107,6 +110,7 @@ func (orc *VTOrcProcess) Setup() (err error) {
"--topo_global_server_address", orc.TopoGlobalAddress,
"--topo_global_root", orc.TopoGlobalRoot,
"--config", orc.ConfigPath,
"--port", fmt.Sprintf("%d", orc.Port),
"--orc_web_dir", path.Join(os.Getenv("VTROOT"), "web", "vtorc"),
)
if *isCoverage {
Expand Down Expand Up @@ -157,3 +161,24 @@ func (orc *VTOrcProcess) TearDown() error {
return <-orc.exit
}
}

// GetVars gets the variables exported on the /debug/vars page of VTOrc
func (orc *VTOrcProcess) GetVars() map[string]any {
varsURL := fmt.Sprintf("http://localhost:%d/debug/vars", orc.Port)
resp, err := http.Get(varsURL)
if err != nil {
return nil
}
defer resp.Body.Close()

if resp.StatusCode == 200 {
resultMap := make(map[string]any)
respByte, _ := io.ReadAll(resp.Body)
err := json.Unmarshal(respByte, &resultMap)
if err != nil {
return nil
}
return resultMap
}
return nil
}
20 changes: 20 additions & 0 deletions go/test/endtoend/vtorc/general/vtorc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/vtorc/utils"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/vtorc/logic"
)

// Cases to test:
Expand Down Expand Up @@ -72,6 +73,7 @@ func TestSingleKeyspace(t *testing.T) {

utils.CheckPrimaryTablet(t, clusterInfo, shard0.Vttablets[0], true)
utils.CheckReplication(t, clusterInfo, shard0.Vttablets[0], shard0.Vttablets[1:], 10*time.Second)
utils.WaitForSuccessfulRecoveryCount(t, clusterInfo.ClusterInstance.VTOrcProcesses[0], logic.ElectNewPrimaryRecoveryName, 1)
}

// Cases to test:
Expand All @@ -88,6 +90,7 @@ func TestKeyspaceShard(t *testing.T) {

utils.CheckPrimaryTablet(t, clusterInfo, shard0.Vttablets[0], true)
utils.CheckReplication(t, clusterInfo, shard0.Vttablets[0], shard0.Vttablets[1:], 10*time.Second)
utils.WaitForSuccessfulRecoveryCount(t, clusterInfo.ClusterInstance.VTOrcProcesses[0], logic.ElectNewPrimaryRecoveryName, 1)
}

// Cases to test:
Expand All @@ -107,6 +110,8 @@ func TestVTOrcRepairs(t *testing.T) {
// find primary from topo
curPrimary := utils.ShardPrimaryTablet(t, clusterInfo, keyspace, shard0)
assert.NotNil(t, curPrimary, "should have elected a primary")
vtOrcProcess := clusterInfo.ClusterInstance.VTOrcProcesses[0]
utils.WaitForSuccessfulRecoveryCount(t, vtOrcProcess, logic.ElectNewPrimaryRecoveryName, 1)

var replica, otherReplica *cluster.Vttablet
for _, tablet := range shard0.Vttablets {
Expand All @@ -133,6 +138,7 @@ func TestVTOrcRepairs(t *testing.T) {
// wait for repair
match := utils.WaitForReadOnlyValue(t, curPrimary, 0)
require.True(t, match)
utils.WaitForSuccessfulRecoveryCount(t, vtOrcProcess, logic.FixPrimaryRecoveryName, 1)
})

t.Run("ReplicaReadWrite", func(t *testing.T) {
Expand All @@ -143,6 +149,7 @@ func TestVTOrcRepairs(t *testing.T) {
// wait for repair
match := utils.WaitForReadOnlyValue(t, replica, 1)
require.True(t, match)
utils.WaitForSuccessfulRecoveryCount(t, vtOrcProcess, logic.FixReplicaRecoveryName, 1)
})

t.Run("StopReplication", func(t *testing.T) {
Expand All @@ -152,20 +159,23 @@ func TestVTOrcRepairs(t *testing.T) {

// check replication is setup correctly
utils.CheckReplication(t, clusterInfo, curPrimary, []*cluster.Vttablet{replica, otherReplica}, 15*time.Second)
utils.WaitForSuccessfulRecoveryCount(t, vtOrcProcess, logic.FixReplicaRecoveryName, 2)

// Stop just the IO thread on the replica
_, err = utils.RunSQL(t, "STOP SLAVE IO_THREAD", replica, "")
require.NoError(t, err)

// check replication is setup correctly
utils.CheckReplication(t, clusterInfo, curPrimary, []*cluster.Vttablet{replica, otherReplica}, 15*time.Second)
utils.WaitForSuccessfulRecoveryCount(t, vtOrcProcess, logic.FixReplicaRecoveryName, 3)

// Stop just the SQL thread on the replica
_, err = utils.RunSQL(t, "STOP SLAVE SQL_THREAD", replica, "")
require.NoError(t, err)

// check replication is setup correctly
utils.CheckReplication(t, clusterInfo, curPrimary, []*cluster.Vttablet{replica, otherReplica}, 15*time.Second)
utils.WaitForSuccessfulRecoveryCount(t, vtOrcProcess, logic.FixReplicaRecoveryName, 4)
})

t.Run("ReplicationFromOtherReplica", func(t *testing.T) {
Expand All @@ -177,6 +187,7 @@ func TestVTOrcRepairs(t *testing.T) {

// wait until the source port is set back correctly by vtorc
utils.CheckSourcePort(t, replica, curPrimary, 15*time.Second)
utils.WaitForSuccessfulRecoveryCount(t, vtOrcProcess, logic.FixReplicaRecoveryName, 5)

// check that writes succeed
utils.VerifyWritesSucceed(t, clusterInfo, curPrimary, []*cluster.Vttablet{replica, otherReplica}, 15*time.Second)
Expand All @@ -196,6 +207,7 @@ func TestVTOrcRepairs(t *testing.T) {
// wait for repair
err = utils.WaitForReplicationToStop(t, curPrimary)
require.NoError(t, err)
utils.WaitForSuccessfulRecoveryCount(t, vtOrcProcess, logic.RecoverPrimaryHasPrimaryRecoveryName, 1)
// check that the writes still succeed
utils.VerifyWritesSucceed(t, clusterInfo, curPrimary, []*cluster.Vttablet{replica, otherReplica}, 10*time.Second)
})
Expand Down Expand Up @@ -314,6 +326,8 @@ func TestVTOrcWithPrs(t *testing.T) {
// find primary from topo
curPrimary := utils.ShardPrimaryTablet(t, clusterInfo, keyspace, shard0)
assert.NotNil(t, curPrimary, "should have elected a primary")
vtOrcProcess := clusterInfo.ClusterInstance.VTOrcProcesses[0]
utils.WaitForSuccessfulRecoveryCount(t, vtOrcProcess, logic.ElectNewPrimaryRecoveryName, 1)

// find any replica tablet other than the current primary
var replica *cluster.Vttablet
Expand All @@ -339,6 +353,12 @@ func TestVTOrcWithPrs(t *testing.T) {

// check that the replica gets promoted
utils.CheckPrimaryTablet(t, clusterInfo, replica, true)
// Verify that VTOrc didn't run any other recovery
utils.WaitForSuccessfulRecoveryCount(t, vtOrcProcess, logic.ElectNewPrimaryRecoveryName, 1)
utils.WaitForSuccessfulRecoveryCount(t, vtOrcProcess, logic.RecoverDeadPrimaryRecoveryName, 0)
utils.WaitForSuccessfulRecoveryCount(t, vtOrcProcess, logic.FixPrimaryRecoveryName, 0)
utils.WaitForSuccessfulRecoveryCount(t, vtOrcProcess, logic.FixReplicaRecoveryName, 0)
utils.WaitForSuccessfulRecoveryCount(t, vtOrcProcess, logic.RecoverPrimaryHasPrimaryRecoveryName, 0)
utils.VerifyWritesSucceed(t, clusterInfo, replica, shard0.Vttablets, 10*time.Second)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"vitess.io/vitess/go/test/endtoend/vtorc/utils"
"vitess.io/vitess/go/vt/vtorc/logic"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -40,6 +41,8 @@ func TestDownPrimary(t *testing.T) {
// find primary from topo
curPrimary := utils.ShardPrimaryTablet(t, clusterInfo, keyspace, shard0)
assert.NotNil(t, curPrimary, "should have elected a primary")
vtOrcProcess := clusterInfo.ClusterInstance.VTOrcProcesses[0]
utils.WaitForSuccessfulRecoveryCount(t, vtOrcProcess, logic.ElectNewPrimaryRecoveryName, 1)

// find the replica and rdonly tablets
var replica, rdonly *cluster.Vttablet
Expand Down Expand Up @@ -70,6 +73,7 @@ func TestDownPrimary(t *testing.T) {
utils.CheckPrimaryTablet(t, clusterInfo, replica, true)
// also check that the replication is working correctly after failover
utils.VerifyWritesSucceed(t, clusterInfo, replica, []*cluster.Vttablet{rdonly}, 10*time.Second)
utils.WaitForSuccessfulRecoveryCount(t, vtOrcProcess, logic.RecoverDeadPrimaryRecoveryName, 1)
}

// Failover should not be cross data centers, according to the configuration file
Expand Down
39 changes: 28 additions & 11 deletions go/test/endtoend/vtorc/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,20 +194,15 @@ func createVttablets(clusterInstance *cluster.LocalProcessCluster, cellInfos []*

// shutdownVttablets shuts down all the vttablets and removes them from the topology
func shutdownVttablets(clusterInfo *VTOrcClusterInfo) error {
// demote the primary tablet if there is
err := demotePrimaryTablet(clusterInfo.Ts)
// reset the shard primary
err := resetShardPrimary(clusterInfo.Ts)
if err != nil {
return err
}

for _, vttablet := range clusterInfo.ClusterInstance.Keyspaces[0].Shards[0].Vttablets {
// we need to stop a vttablet only if it is not shutdown
if !vttablet.VttabletProcess.IsShutdown() {
// wait for primary tablet to demote. For all others, it will not wait
err = vttablet.VttabletProcess.WaitForTabletTypes([]string{vttablet.Type})
if err != nil {
return err
}
// Stop the vttablets
err := vttablet.VttabletProcess.TearDown()
if err != nil {
Expand All @@ -224,10 +219,10 @@ func shutdownVttablets(clusterInfo *VTOrcClusterInfo) error {
return nil
}

// demotePrimaryTablet demotes the primary tablet for our shard
func demotePrimaryTablet(ts *topo.Server) (err error) {
// resetShardPrimary resets the shard's primary
func resetShardPrimary(ts *topo.Server) (err error) {
// lock the shard
ctx, unlock, lockErr := ts.LockShard(context.Background(), keyspaceName, shardName, "demotePrimaryTablet-vtorc-endtoend-test")
ctx, unlock, lockErr := ts.LockShard(context.Background(), keyspaceName, shardName, "resetShardPrimary-vtorc-endtoend-test")
if lockErr != nil {
return lockErr
}
Expand All @@ -236,7 +231,6 @@ func demotePrimaryTablet(ts *topo.Server) (err error) {
// update the shard record's primary
if _, err = ts.UpdateShardFields(ctx, keyspaceName, shardName, func(si *topo.ShardInfo) error {
si.PrimaryAlias = nil
si.SetPrimaryTermStartTime(time.Now())
return nil
}); err != nil {
return err
Expand Down Expand Up @@ -339,6 +333,9 @@ func cleanAndStartVttablet(t *testing.T, clusterInfo *VTOrcClusterInfo, vttablet
// reset the binlog
_, err = RunSQL(t, "RESET MASTER", vttablet, "")
require.NoError(t, err)
// set read-only to true
_, err = RunSQL(t, "SET GLOBAL read_only = ON", vttablet, "")
require.NoError(t, err)

// start the vttablet
err = vttablet.VttabletProcess.Setup()
Expand Down Expand Up @@ -913,3 +910,23 @@ func WaitForReadOnlyValue(t *testing.T, curPrimary *cluster.Vttablet, expectValu
}
return false
}

// WaitForSuccessfulRecoveryCount waits until the given recovery name's count of successful runs matches the count expected
func WaitForSuccessfulRecoveryCount(t *testing.T, vtorcInstance *cluster.VTOrcProcess, recoveryName string, countExpected int) {
t.Helper()
timeout := 15 * time.Second
startTime := time.Now()
for time.Since(startTime) < timeout {
vars := vtorcInstance.GetVars()
successfulRecoveriesMap := vars["SuccessfulRecoveries"].(map[string]interface{})
successCount := successfulRecoveriesMap[recoveryName]
if successCount == countExpected {
return
}
time.Sleep(time.Second)
}
vars := vtorcInstance.GetVars()
successfulRecoveriesMap := vars["SuccessfulRecoveries"].(map[string]interface{})
successCount := successfulRecoveriesMap[recoveryName]
assert.EqualValues(t, countExpected, successCount)
}
Loading