Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/common/env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ mkdir -p "${VTDATAROOT}/tmp"
# In your own environment you may prefer to use config files,
# such as ~/.my.cnf

alias mysql="command mysql -h 127.0.0.1 -P 15306"
alias mysql="command mysql --no-defaults -h 127.0.0.1 -P 15306"
alias vtctlclient="command vtctlclient --server localhost:15999 --log_dir ${VTDATAROOT}/tmp --alsologtostderr"
alias vtctldclient="command vtctldclient --server localhost:15999"

Expand Down
48 changes: 48 additions & 0 deletions go/test/endtoend/cluster/cluster_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"testing"
"time"

"github.com/buger/jsonparser"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand All @@ -41,6 +42,8 @@ var (
tmClient = tmc.NewClient()
dbCredentialFile string
InsertTabletTemplateKsID = `insert into %s (id, msg) values (%d, '%s') /* id:%d */`
defaultOperationTimeout = 60 * time.Second
defeaultRetryDelay = 1 * time.Second
)

// Restart restarts vttablet and mysql.
Expand Down Expand Up @@ -381,3 +384,48 @@ func WaitForTabletSetup(vtctlClientProcess *VtctlClientProcess, expectedTablets

return fmt.Errorf("all %d tablet are not in expected state %s", expectedTablets, expectedStatus)
}

// WaitForHealthyShard waits for the given shard info record in the topo
// server to list a tablet (alias and uid) as the primary serving tablet
// for the shard. This is done using "vtctldclient GetShard" and parsing
// its JSON output. All other watchers should then also see this shard
// info status as well.
func WaitForHealthyShard(vtctldclient *VtctldClientProcess, keyspace, shard string) error {
var (
tmr = time.NewTimer(defaultOperationTimeout)
res string
err error
json []byte
cell string
uid int64
)
for {
res, err = vtctldclient.ExecuteCommandWithOutput("GetShard", fmt.Sprintf("%s/%s", keyspace, shard))
if err != nil {
return err
}
json = []byte(res)

cell, err = jsonparser.GetString(json, "shard", "primary_alias", "cell")
if err != nil && err != jsonparser.KeyPathNotFoundError {
return err
}
uid, err = jsonparser.GetInt(json, "shard", "primary_alias", "uid")
if err != nil && err != jsonparser.KeyPathNotFoundError {
return err
}

if cell != "" && uid > 0 {
return nil
}

select {
case <-tmr.C:
return fmt.Errorf("timed out waiting for the %s/%s shard to become healthy in the topo after %v; last seen status: %s; last seen error: %v",
keyspace, shard, defaultOperationTimeout, res, err)
default:
}

time.Sleep(defeaultRetryDelay)
}
}
23 changes: 15 additions & 8 deletions go/test/endtoend/vreplication/materialize_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ limitations under the License.
package vreplication

import (
"fmt"
"testing"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/test/endtoend/cluster"
)

const smSchema = `
Expand Down Expand Up @@ -68,6 +69,7 @@ func testShardedMaterialize(t *testing.T) {
vc = NewVitessCluster(t, "TestShardedMaterialize", allCells, mainClusterConfig)
ks1 := "ks1"
ks2 := "ks2"
shard := "0"
require.NotNil(t, vc)
defaultReplicas = 0 // because of CI resource constraints we can only run this test with primary tablets
defer func() { defaultReplicas = 1 }()
Expand All @@ -78,15 +80,17 @@ func testShardedMaterialize(t *testing.T) {
vc.AddKeyspace(t, []*Cell{defaultCell}, ks1, "0", smVSchema, smSchema, defaultReplicas, defaultRdonly, 100, nil)
vtgate = defaultCell.Vtgates[0]
require.NotNil(t, vtgate)
vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", ks1, "0"), 1)
err := cluster.WaitForHealthyShard(vc.VtctldClient, ks1, shard)
require.NoError(t, err)

vc.AddKeyspace(t, []*Cell{defaultCell}, ks2, "0", smVSchema, smSchema, defaultReplicas, defaultRdonly, 200, nil)
vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", ks2, "0"), 1)
err = cluster.WaitForHealthyShard(vc.VtctldClient, ks2, shard)
require.NoError(t, err)

vtgateConn = getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
defer vtgateConn.Close()
verifyClusterHealth(t, vc)
_, err := vtgateConn.ExecuteFetch(initDataQuery, 0, false)
_, err = vtgateConn.ExecuteFetch(initDataQuery, 0, false)
require.NoError(t, err)
materialize(t, smMaterializeSpec)
tab := vc.getPrimaryTablet(t, ks2, "0")
Expand Down Expand Up @@ -184,6 +188,7 @@ func testMaterialize(t *testing.T) {
vc = NewVitessCluster(t, "TestMaterialize", allCells, mainClusterConfig)
sourceKs := "source"
targetKs := "target"
shard := "0"
require.NotNil(t, vc)
defaultReplicas = 0 // because of CI resource constraints we can only run this test with primary tablets
defer func() { defaultReplicas = 1 }()
Expand All @@ -194,19 +199,21 @@ func testMaterialize(t *testing.T) {
vc.AddKeyspace(t, []*Cell{defaultCell}, sourceKs, "0", smMaterializeVSchemaSource, smMaterializeSchemaSource, defaultReplicas, defaultRdonly, 300, nil)
vtgate = defaultCell.Vtgates[0]
require.NotNil(t, vtgate)
vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", sourceKs, "0"), 1)
err := cluster.WaitForHealthyShard(vc.VtctldClient, sourceKs, shard)
require.NoError(t, err)

vc.AddKeyspace(t, []*Cell{defaultCell}, targetKs, "0", smMaterializeVSchemaTarget, smMaterializeSchemaTarget, defaultReplicas, defaultRdonly, 400, nil)
vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", targetKs, "0"), 1)
err = cluster.WaitForHealthyShard(vc.VtctldClient, targetKs, shard)
require.NoError(t, err)

vtgateConn = getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
defer vtgateConn.Close()
verifyClusterHealth(t, vc)

_, err := vtgateConn.ExecuteFetch(materializeInitDataQuery, 0, false)
_, err = vtgateConn.ExecuteFetch(materializeInitDataQuery, 0, false)
require.NoError(t, err)

ks2Primary := vc.getPrimaryTablet(t, targetKs, "0")
ks2Primary := vc.getPrimaryTablet(t, targetKs, shard)
_, err = ks2Primary.QueryTablet(customFunc, targetKs, true)
require.NoError(t, err)

Expand Down
8 changes: 5 additions & 3 deletions go/test/endtoend/vreplication/migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/test/endtoend/cluster"
)

func insertInitialDataIntoExternalCluster(t *testing.T, conn *mysql.Conn) {
Expand Down Expand Up @@ -55,9 +56,10 @@ func TestMigrate(t *testing.T) {

defaultCell = vc.Cells[defaultCellName]
vc.AddKeyspace(t, []*Cell{defaultCell}, "product", "0", initialProductVSchema, initialProductSchema, defaultReplicas, defaultRdonly, 100, nil)
err := cluster.WaitForHealthyShard(vc.VtctldClient, "product", "0")
require.NoError(t, err)
vtgate = defaultCell.Vtgates[0]
require.NotNil(t, vtgate)
vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", "product", "0"), 1)

vtgateConn = getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
defer vtgateConn.Close()
Expand All @@ -76,12 +78,12 @@ func TestMigrate(t *testing.T) {
extVtgate := extCell2.Vtgates[0]
require.NotNil(t, extVtgate)

extVtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", "rating", "0"), 1)
err = cluster.WaitForHealthyShard(extVc.VtctldClient, "rating", "0")
require.NoError(t, err)
verifyClusterHealth(t, extVc)
extVtgateConn := getConnection(t, extVc.ClusterConfig.hostname, extVc.ClusterConfig.vtgateMySQLPort)
insertInitialDataIntoExternalCluster(t, extVtgateConn)

var err error
var output, expected string
ksWorkflow := "product.e1"

Expand Down
3 changes: 2 additions & 1 deletion go/test/endtoend/vreplication/performance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ create table customer(cid int, name varbinary(128), meta json default null, typ
vtgate = defaultCell.Vtgates[0]
require.NotNil(t, vtgate)

vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", "product", "0"), 1)
err := cluster.WaitForHealthyShard(vc.VtctldClient, "product", "0")
require.NoError(t, err)

vtgateConn = getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
defer vtgateConn.Close()
Expand Down
23 changes: 10 additions & 13 deletions go/test/endtoend/vreplication/resharding_workflows_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,8 @@ func setupCluster(t *testing.T) *VitessCluster {

vtgate = zone1.Vtgates[0]
require.NotNil(t, vtgate)
vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", "product", "0"), 1)
err := cluster.WaitForHealthyShard(vc.VtctldClient, "product", "0")
require.NoError(t, err)
vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.replica", "product", "0"), 2)
vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.rdonly", "product", "0"), 1)

Expand All @@ -590,12 +591,10 @@ func setupCustomerKeyspace(t *testing.T) {
customerVSchema, customerSchema, defaultReplicas, defaultRdonly, 200, nil); err != nil {
t.Fatal(err)
}
if err := vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", "customer", "-80"), 1); err != nil {
t.Fatal(err)
}
if err := vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", "customer", "80-"), 1); err != nil {
t.Fatal(err)
}
err := cluster.WaitForHealthyShard(vc.VtctldClient, "customer", "-80")
require.NoError(t, err)
err = cluster.WaitForHealthyShard(vc.VtctldClient, "customer", "80-")
require.NoError(t, err)
if err := vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.replica", "customer", "-80"), 2); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -623,9 +622,8 @@ func setupCustomer2Keyspace(t *testing.T) {
t.Fatal(err)
}
for _, c2shard := range c2shards {
if err := vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", c2keyspace, c2shard), 1); err != nil {
t.Fatal(err)
}
err := cluster.WaitForHealthyShard(vc.VtctldClient, c2keyspace, c2shard)
require.NoError(t, err)
if defaultReplicas > 0 {
if err := vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.replica", c2keyspace, c2shard), defaultReplicas); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -758,9 +756,8 @@ func createAdditionalCustomerShards(t *testing.T, shards string) {
arrTargetShardNames := strings.Split(shards, ",")

for _, shardName := range arrTargetShardNames {
if err := vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", ksName, shardName), 1); err != nil {
require.NoError(t, err)
}
err := cluster.WaitForHealthyShard(vc.VtctldClient, ksName, shardName)
require.NoError(t, err)
if err := vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.replica", ksName, shardName), 2); err != nil {
require.NoError(t, err)
}
Expand Down
9 changes: 5 additions & 4 deletions go/test/endtoend/vreplication/time_zone_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func TestMoveTablesTZ(t *testing.T) {
workflow := "tz"
sourceKs := "product"
targetKs := "customer"
shard := "0"
ksWorkflow := fmt.Sprintf("%s.%s", targetKs, workflow)
ksReverseWorkflow := fmt.Sprintf("%s.%s_reverse", sourceKs, workflow)

Expand All @@ -51,7 +52,8 @@ func TestMoveTablesTZ(t *testing.T) {

vtgate = cell1.Vtgates[0]
require.NotNil(t, vtgate)
vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", "product", "0"), 1)
err := cluster.WaitForHealthyShard(vc.VtctldClient, sourceKs, shard)
require.NoError(t, err)

vtgateConn = getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
defer vtgateConn.Close()
Expand Down Expand Up @@ -87,9 +89,8 @@ func TestMoveTablesTZ(t *testing.T) {
if _, err := vc.AddKeyspace(t, cells, targetKs, "0", customerVSchema, customerSchema, defaultReplicas, defaultRdonly, 200, targetKsOpts); err != nil {
t.Fatal(err)
}
if err := vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", "customer", "0"), 1); err != nil {
t.Fatal(err)
}
err = cluster.WaitForHealthyShard(vc.VtctldClient, targetKs, shard)
require.NoError(t, err)

defaultCell := vc.Cells["zone1"]
custKs := vc.Cells[defaultCell.Name].Keyspaces[targetKs]
Expand Down
8 changes: 5 additions & 3 deletions go/test/endtoend/vreplication/vdiff2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"time"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/test/endtoend/cluster"
)

type testCase struct {
Expand Down Expand Up @@ -121,7 +123,7 @@ func TestVDiff2(t *testing.T) {
vtgate = defaultCell.Vtgates[0]
require.NotNil(t, vtgate)
for _, shard := range sourceShards {
require.NoError(t, vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", sourceKs, shard), 1))
require.NoError(t, cluster.WaitForHealthyShard(vc.VtctldClient, sourceKs, shard))
}

vtgateConn = getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
Expand All @@ -139,7 +141,7 @@ func TestVDiff2(t *testing.T) {
_, err := vc.AddKeyspace(t, cells, targetKs, strings.Join(targetShards, ","), customerVSchema, customerSchema, 0, 0, 200, targetKsOpts)
require.NoError(t, err)
for _, shard := range targetShards {
require.NoError(t, vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", targetKs, shard), 1))
require.NoError(t, cluster.WaitForHealthyShard(vc.VtctldClient, targetKs, shard))
}

for _, tc := range testCases {
Expand All @@ -155,7 +157,7 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, cells []*Cell)
tks := vc.Cells[cells[0].Name].Keyspaces[tc.targetKs]
require.NoError(t, vc.AddShards(t, cells, tks, tc.targetShards, 0, 0, tc.tabletBaseID, targetKsOpts))
for _, shard := range arrTargetShards {
require.NoError(t, vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", tc.targetKs, shard), 1))
require.NoError(t, cluster.WaitForHealthyShard(vc.VtctldClient, tc.targetKs, shard))
}
}
ksWorkflow := fmt.Sprintf("%s.%s", tc.targetKs, tc.workflow)
Expand Down
Loading