Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 34 additions & 14 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,8 +589,9 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
productTab := vc.Cells[defaultCell.Name].Keyspaces["product"].Shards["0"].Tablets["zone1-100"].Vttablet

// Wait to finish the copy phase for all tables
catchup(t, customerTab1, workflow, "MoveTables")
catchup(t, customerTab2, workflow, "MoveTables")
workflowType := "MoveTables"
catchup(t, customerTab1, workflow, workflowType)
catchup(t, customerTab2, workflow, workflowType)

// Confirm that the 0 scale decimal field, dec80, is replicated correctly
dec80Replicated := false
Expand Down Expand Up @@ -623,7 +624,7 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
}

vdiff1(t, ksWorkflow, "")
switchReadsDryRun(t, allCellNames, ksWorkflow, dryRunResultsReadCustomerShard)
switchReadsDryRun(t, workflowType, allCellNames, ksWorkflow, dryRunResultsReadCustomerShard)
switchReads(t, allCellNames, ksWorkflow)
require.True(t, validateThatQueryExecutesOnTablet(t, vtgateConn, productTab, "customer", query, query))

Expand Down Expand Up @@ -744,7 +745,8 @@ func reshardCustomer2to4Split(t *testing.T, cells []*Cell, sourceCellOrAlias str
t.Run("reshardCustomer2to4Split", func(t *testing.T) {
ksName := "customer"
counts := map[string]int{"zone1-600": 4, "zone1-700": 5, "zone1-800": 6, "zone1-900": 5}
reshard(t, ksName, "customer", "c2c4", "-80,80-", "-40,40-80,80-c0,c0-", 600, counts, nil, cells, sourceCellOrAlias)
reshard(t, ksName, "customer", "c2c4", "-80,80-", "-40,40-80,80-c0,c0-",
600, counts, nil, nil, cells, sourceCellOrAlias, 1)
waitForRowCount(t, vtgateConn, ksName, "customer", 20)
query := "insert into customer (name) values('yoko')"
execVtgateQuery(t, vtgateConn, ksName, query)
Expand All @@ -756,7 +758,8 @@ func reshardMerchant2to3SplitMerge(t *testing.T) {
t.Run("reshardMerchant2to3SplitMerge", func(t *testing.T) {
ksName := merchantKeyspace
counts := map[string]int{"zone1-1600": 0, "zone1-1700": 2, "zone1-1800": 0}
reshard(t, ksName, "merchant", "m2m3", "-80,80-", "-40,40-c0,c0-", 1600, counts, dryRunResultsSwitchWritesM2m3, nil, "")
reshard(t, ksName, "merchant", "m2m3", "-80,80-", "-40,40-c0,c0-",
1600, counts, dryRunResultsSwitchReadM2m3, dryRunResultsSwitchWritesM2m3, nil, "", 1)
waitForRowCount(t, vtgateConn, ksName, "merchant", 2)
query := "insert into merchant (mname, category) values('amazon', 'electronics')"
execVtgateQuery(t, vtgateConn, ksName, query)
Expand Down Expand Up @@ -802,7 +805,8 @@ func reshardMerchant3to1Merge(t *testing.T) {
t.Run("reshardMerchant3to1Merge", func(t *testing.T) {
ksName := merchantKeyspace
counts := map[string]int{"zone1-2000": 3}
reshard(t, ksName, "merchant", "m3m1", "-40,40-c0,c0-", "0", 2000, counts, nil, nil, "")
reshard(t, ksName, "merchant", "m3m1", "-40,40-c0,c0-", "0",
2000, counts, nil, nil, nil, "", 1)
waitForRowCount(t, vtgateConn, ksName, "merchant", 3)
query := "insert into merchant (mname, category) values('flipkart', 'electronics')"
execVtgateQuery(t, vtgateConn, ksName, query)
Expand All @@ -814,19 +818,23 @@ func reshardCustomer3to2SplitMerge(t *testing.T) { //-40,40-80,80-c0 => merge/sp
t.Run("reshardCustomer3to2SplitMerge", func(t *testing.T) {
ksName := "customer"
counts := map[string]int{"zone1-1000": 8, "zone1-1100": 8, "zone1-1200": 5}
reshard(t, ksName, "customer", "c4c3", "-40,40-80,80-c0", "-60,60-c0", 1000, counts, nil, nil, "")
reshard(t, ksName, "customer", "c4c3", "-40,40-80,80-c0", "-60,60-c0",
1000, counts, nil, nil, nil, "", 1)
})
}

func reshardCustomer3to1Merge(t *testing.T) { //to unsharded
t.Run("reshardCustomer3to1Merge", func(t *testing.T) {
ksName := "customer"
counts := map[string]int{"zone1-1500": 21}
reshard(t, ksName, "customer", "c3c1", "-60,60-c0,c0-", "0", 1500, counts, nil, nil, "")
reshard(t, ksName, "customer", "c3c1", "-60,60-c0,c0-", "0",
1500, counts, nil, nil, nil, "", 3)
})
}

func reshard(t *testing.T, ksName string, tableName string, workflow string, sourceShards string, targetShards string, tabletIDBase int, counts map[string]int, dryRunResultSwitchWrites []string, cells []*Cell, sourceCellOrAlias string) {
func reshard(t *testing.T, ksName string, tableName string, workflow string, sourceShards string, targetShards string,
tabletIDBase int, counts map[string]int, dryRunResultSwitchReads, dryRunResultSwitchWrites []string, cells []*Cell, sourceCellOrAlias string,
autoIncrementStep int) {
t.Run("reshard", func(t *testing.T) {
if cells == nil {
cells = []*Cell{defaultCell}
Expand All @@ -844,7 +852,8 @@ func reshard(t *testing.T, ksName string, tableName string, workflow string, sou
t.Fatal(err)
}
}
if err := vc.VtctlClient.ExecuteCommand("Reshard", "--", "--v1", "--cells="+sourceCellOrAlias, "--tablet_types=replica,primary", ksWorkflow, "--", sourceShards, targetShards); err != nil {
workflowType := "Reshard"
if err := vc.VtctlClient.ExecuteCommand(workflowType, "--", "--v1", "--cells="+sourceCellOrAlias, "--tablet_types=replica,primary", ksWorkflow, "--", sourceShards, targetShards); err != nil {
t.Fatalf("Reshard command failed with %+v\n", err)
}
tablets := vc.getVttabletsInKeyspace(t, defaultCell, ksName, "primary")
Expand All @@ -859,6 +868,9 @@ func reshard(t *testing.T, ksName string, tableName string, workflow string, sou
}
}
vdiff1(t, ksWorkflow, "")
if dryRunResultSwitchReads != nil {
switchReadsDryRun(t, workflowType, allCellNames, ksWorkflow, dryRunResultSwitchReads)
}
switchReads(t, allCellNames, ksWorkflow)
if dryRunResultSwitchWrites != nil {
switchWritesDryRun(t, ksWorkflow, dryRunResultSwitchWrites)
Expand Down Expand Up @@ -1193,10 +1205,18 @@ func applyVSchema(t *testing.T, vschema, keyspace string) {
require.NoError(t, err)
}

func switchReadsDryRun(t *testing.T, cells, ksWorkflow string, dryRunResults []string) {
output, err := vc.VtctlClient.ExecuteCommandWithOutput("SwitchReads", "--", "--cells="+cells, "--tablet_types=replica", "--dry_run", ksWorkflow)
require.NoError(t, err, fmt.Sprintf("SwitchReads DryRun Error: %s: %s", err, output))
validateDryRunResults(t, output, dryRunResults)
func switchReadsDryRun(t *testing.T, workflowType, cells, ksWorkflow string, dryRunResults []string) {
if workflowType != binlogdatapb.VReplicationWorkflowType_name[int32(binlogdatapb.VReplicationWorkflowType_MoveTables)] &&
workflowType != binlogdatapb.VReplicationWorkflowType_name[int32(binlogdatapb.VReplicationWorkflowType_Reshard)] {
require.FailNowf(t, "Invalid workflow type for SwitchTraffic, must be MoveTables or Reshard",
"workflow type specified: %s", workflowType)
}
output, err := vc.VtctlClient.ExecuteCommandWithOutput(workflowType, "--", "--cells="+cells, "--tablet_types=rdonly,replica",
"--dry_run", "SwitchTraffic", ksWorkflow)
require.NoError(t, err, fmt.Sprintf("Switching Reads DryRun Error: %s: %s", err, output))
if dryRunResults != nil {
validateDryRunResults(t, output, dryRunResults)
}
}

func switchReads(t *testing.T, cells, ksWorkflow string) {
Expand Down
6 changes: 6 additions & 0 deletions go/test/endtoend/vreplication/vreplication_test_env.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,9 @@ var dryRunResultsDropSourcesRenameCustomerShard = []string{
"Unlock keyspace customer",
"Unlock keyspace product",
}

var dryRunResultsSwitchReadM2m3 = []string{
"Lock keyspace merchant-type",
"Switch reads from keyspace merchant-type to keyspace merchant-type for shards -80,80- to shards -40,40-c0,c0-",
"Unlock keyspace merchant-type",
}
5 changes: 3 additions & 2 deletions go/test/endtoend/vreplication/vstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,8 @@ func testVStreamStopOnReshardFlag(t *testing.T, stopOnReshard bool, baseTabletID
tickCount++
switch tickCount {
case 1:
reshard(t, "sharded", "customer", "vstreamStopOnReshard", "-80,80-", "-40,40-", baseTabletID+400, nil, nil, nil, defaultCellName)
reshard(t, "sharded", "customer", "vstreamStopOnReshard", "-80,80-",
"-40,40-", baseTabletID+400, nil, nil, nil, nil, defaultCellName, 1)
case 60:
done = true
}
Expand Down Expand Up @@ -499,7 +500,7 @@ func testVStreamCopyMultiKeyspaceReshard(t *testing.T, baseTabletID int) numEven
tickCount++
switch tickCount {
case 1:
reshard(t, "sharded", "customer", "vstreamCopyMultiKeyspaceReshard", "-80,80-", "-40,40-", baseTabletID+400, nil, nil, nil, defaultCellName)
reshard(t, "sharded", "customer", "vstreamCopyMultiKeyspaceReshard", "-80,80-", "-40,40-", baseTabletID+400, nil, nil, nil, nil, defaultCellName, 1)
reshardDone = true
case 60:
done = true
Expand Down
2 changes: 1 addition & 1 deletion go/vt/wrangler/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ func (wr *Wrangler) SwitchReads(ctx context.Context, targetKeyspace, workflowNam
return sw.logs(), nil
}
wr.Logger().Infof("About to switchShardReads: %+v, %+v, %+v", cells, servedTypes, direction)
if err := ts.switchShardReads(ctx, cells, servedTypes, direction); err != nil {
if err := sw.switchShardReads(ctx, cells, servedTypes, direction); err != nil {
ts.Logger().Errorf("switchShardReads failed: %v", err)
return nil, err
}
Expand Down