Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions go/test/endtoend/reparent/emergencyreparent/ers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/test/endtoend/cluster"
Expand Down Expand Up @@ -438,10 +437,7 @@ func TestERSForInitialization(t *testing.T) {
utils.RunSQL(context.Background(), t, "create table vt_insert_test (id bigint, msg varchar(64), primary key (id)) Engine=InnoDB", tablets[0])
utils.CheckPrimaryTablet(t, clusterInstance, tablets[0])
utils.ValidateTopology(t, clusterInstance, false)
time.Sleep(100 * time.Millisecond) // wait for replication to catchup
strArray := utils.GetShardReplicationPositions(t, clusterInstance, utils.KeyspaceName, utils.ShardName, true)
assert.Equal(t, len(tablets), len(strArray))
assert.Contains(t, strArray[0], "primary") // primary first
utils.WaitForReplicationToStart(t, clusterInstance, utils.KeyspaceName, utils.ShardName, len(tablets), true)
utils.ConfirmReplication(t, tablets[0], tablets[1:])
}

Expand Down
4 changes: 1 addition & 3 deletions go/test/endtoend/reparent/plannedreparent/reparent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,7 @@ func TestReparentGraceful(t *testing.T) {
tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets

// Run this to make sure it succeeds.
strArray := utils.GetShardReplicationPositions(t, clusterInstance, utils.KeyspaceName, utils.ShardName, false)
assert.Equal(t, 4, len(strArray)) // one primary, three replicas
assert.Contains(t, strArray[0], "primary") // primary first
utils.WaitForReplicationToStart(t, clusterInstance, utils.KeyspaceName, utils.ShardName, len(tablets), true)

// Perform a graceful reparent operation
utils.Prs(t, clusterInstance, tablets[1])
Expand Down
56 changes: 38 additions & 18 deletions go/test/endtoend/reparent/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,11 @@ var (
primary key (id)
) Engine=InnoDB
`
cell1 = "zone1"
cell2 = "zone2"
ShardName = "0"
KeyspaceShard = KeyspaceName + "/" + ShardName
cell1 = "zone1"
cell2 = "zone2"
ShardName = "0"
KeyspaceShard = KeyspaceName + "/" + ShardName
replicationWaitTimeout = time.Duration(15 * time.Second)
)

//region cluster setup/teardown
Expand Down Expand Up @@ -198,10 +199,7 @@ func setupShard(ctx context.Context, t *testing.T, clusterInstance *cluster.Loca
CheckPrimaryTablet(t, clusterInstance, tablets[0])

ValidateTopology(t, clusterInstance, false)
time.Sleep(100 * time.Millisecond) // wait for replication to catchup
strArray := GetShardReplicationPositions(t, clusterInstance, KeyspaceName, shardName, true)
assert.Equal(t, len(tablets), len(strArray))
assert.Contains(t, strArray[0], "primary") // primary first
WaitForReplicationToStart(t, clusterInstance, KeyspaceName, shardName, len(tablets), true)
}

func setupClusterLegacy(ctx context.Context, t *testing.T, shardName string, cells []string, numTablets []int, enableSemiSync bool) *cluster.LocalProcessCluster {
Expand Down Expand Up @@ -321,10 +319,7 @@ func setupShardLegacy(ctx context.Context, t *testing.T, clusterInstance *cluste
CheckPrimaryTablet(t, clusterInstance, tablets[0])

ValidateTopology(t, clusterInstance, false)
time.Sleep(100 * time.Millisecond) // wait for replication to catchup
strArray := GetShardReplicationPositions(t, clusterInstance, KeyspaceName, shardName, true)
assert.Equal(t, len(tablets), len(strArray))
assert.Contains(t, strArray[0], "primary") // primary first
WaitForReplicationToStart(t, clusterInstance, KeyspaceName, shardName, len(tablets), true)
}

//endregion
Expand Down Expand Up @@ -455,7 +450,6 @@ func ConfirmReplication(t *testing.T, primary *cluster.Vttablet, replicas []*clu
// insert data into the new primary, check the connected replica work
insertSQL := fmt.Sprintf(insertSQL, n, n)
RunSQL(ctx, t, insertSQL, primary)
time.Sleep(100 * time.Millisecond)
for _, tab := range replicas {
err := CheckInsertedValues(ctx, t, tab, n)
require.NoError(t, err)
Expand Down Expand Up @@ -518,20 +512,28 @@ func isHealthyPrimaryTablet(t *testing.T, clusterInstance *cluster.LocalProcessC

// CheckInsertedValues checks that the given value is present in the given tablet
func CheckInsertedValues(ctx context.Context, t *testing.T, tablet *cluster.Vttablet, index int) error {
query := fmt.Sprintf("select msg from vt_insert_test where id=%d", index)
tabletParams := getMysqlConnParam(tablet)
conn, err := mysql.Connect(ctx, &tabletParams)
require.Nil(t, err)
defer conn.Close()

// wait until it gets the data
timeout := time.Now().Add(15 * time.Second)
timeout := time.Now().Add(replicationWaitTimeout)
i := 0
for time.Now().Before(timeout) {
selectSQL := fmt.Sprintf("select msg from vt_insert_test where id=%d", index)
qr := RunSQL(ctx, t, selectSQL, tablet)
if len(qr.Rows) == 1 {
// We'll get a mysql.ERNoSuchTable (1146) error if the CREATE TABLE has not replicated yet and
// it's possible that we get other ephemeral errors too, so we make the tests more robust by
// retrying with the timeout.
qr, err := conn.ExecuteFetch(query, 1, true)
if err == nil && len(qr.Rows) == 1 {
return nil
}
t := time.Duration(300 * i)
time.Sleep(t * time.Millisecond)
i++
}
return fmt.Errorf("data is not yet replicated on tablet %s", tablet.Alias)
return fmt.Errorf("data did not get replicated on tablet %s within the timeout of %v", tablet.Alias, replicationWaitTimeout)
}

func CheckSemiSyncSetupCorrectly(t *testing.T, tablet *cluster.Vttablet, semiSyncVal string) {
Expand Down Expand Up @@ -618,6 +620,7 @@ func GetNewPrimary(t *testing.T, clusterInstance *cluster.LocalProcessCluster) *
}

// GetShardReplicationPositions gets the shards replication positions.
// This should not generally be called directly, instead use the WaitForReplicationToCatchup method.
func GetShardReplicationPositions(t *testing.T, clusterInstance *cluster.LocalProcessCluster, keyspaceName, shardName string, doPrint bool) []string {
output, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput(
"ShardReplicationPositions", fmt.Sprintf("%s/%s", keyspaceName, shardName))
Expand All @@ -635,6 +638,23 @@ func GetShardReplicationPositions(t *testing.T, clusterInstance *cluster.LocalPr
return strArray
}

func WaitForReplicationToStart(t *testing.T, clusterInstance *cluster.LocalProcessCluster, keyspaceName, shardName string, tabletCnt int, doPrint bool) {
tck := time.NewTicker(500 * time.Millisecond)
for {
select {
case <-tck.C:
strArray := GetShardReplicationPositions(t, clusterInstance, KeyspaceName, shardName, true)
if len(strArray) == tabletCnt && strings.Contains(strArray[0], "primary") { // primary first
return
}
case <-time.After(replicationWaitTimeout):
require.FailNow(t, fmt.Sprintf("replication did not start everywhere in %s/%s within the timeout of %v",
keyspaceName, shardName, replicationWaitTimeout))
return
}
}
}

// endregion

// CheckReplicaStatus checks the replication status and asserts that the replication is stopped
Expand Down