diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index 74e724ad52e..3c4da958374 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -589,8 +589,9 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl productTab := vc.Cells[defaultCell.Name].Keyspaces["product"].Shards["0"].Tablets["zone1-100"].Vttablet // Wait to finish the copy phase for all tables - catchup(t, customerTab1, workflow, "MoveTables") - catchup(t, customerTab2, workflow, "MoveTables") + workflowType := "MoveTables" + catchup(t, customerTab1, workflow, workflowType) + catchup(t, customerTab2, workflow, workflowType) // Confirm that the 0 scale decimal field, dec80, is replicated correctly dec80Replicated := false @@ -623,7 +624,7 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl } vdiff1(t, ksWorkflow, "") - switchReadsDryRun(t, allCellNames, ksWorkflow, dryRunResultsReadCustomerShard) + switchReadsDryRun(t, workflowType, allCellNames, ksWorkflow, dryRunResultsReadCustomerShard) switchReads(t, allCellNames, ksWorkflow) require.True(t, validateThatQueryExecutesOnTablet(t, vtgateConn, productTab, "customer", query, query)) @@ -744,7 +745,8 @@ func reshardCustomer2to4Split(t *testing.T, cells []*Cell, sourceCellOrAlias str t.Run("reshardCustomer2to4Split", func(t *testing.T) { ksName := "customer" counts := map[string]int{"zone1-600": 4, "zone1-700": 5, "zone1-800": 6, "zone1-900": 5} - reshard(t, ksName, "customer", "c2c4", "-80,80-", "-40,40-80,80-c0,c0-", 600, counts, nil, cells, sourceCellOrAlias) + reshard(t, ksName, "customer", "c2c4", "-80,80-", "-40,40-80,80-c0,c0-", + 600, counts, nil, nil, cells, sourceCellOrAlias, 1) waitForRowCount(t, vtgateConn, ksName, "customer", 20) query := "insert into customer (name) values('yoko')" execVtgateQuery(t, vtgateConn, ksName, query) @@ -756,7 +758,8 @@ func reshardMerchant2to3SplitMerge(t *testing.T) { t.Run("reshardMerchant2to3SplitMerge", func(t *testing.T) { ksName := merchantKeyspace counts := map[string]int{"zone1-1600": 0, "zone1-1700": 2, "zone1-1800": 0} - reshard(t, ksName, "merchant", "m2m3", "-80,80-", "-40,40-c0,c0-", 1600, counts, dryRunResultsSwitchWritesM2m3, nil, "") + reshard(t, ksName, "merchant", "m2m3", "-80,80-", "-40,40-c0,c0-", + 1600, counts, dryRunResultsSwitchReadM2m3, dryRunResultsSwitchWritesM2m3, nil, "", 1) waitForRowCount(t, vtgateConn, ksName, "merchant", 2) query := "insert into merchant (mname, category) values('amazon', 'electronics')" execVtgateQuery(t, vtgateConn, ksName, query) @@ -802,7 +805,8 @@ func reshardMerchant3to1Merge(t *testing.T) { t.Run("reshardMerchant3to1Merge", func(t *testing.T) { ksName := merchantKeyspace counts := map[string]int{"zone1-2000": 3} - reshard(t, ksName, "merchant", "m3m1", "-40,40-c0,c0-", "0", 2000, counts, nil, nil, "") + reshard(t, ksName, "merchant", "m3m1", "-40,40-c0,c0-", "0", + 2000, counts, nil, nil, nil, "", 1) waitForRowCount(t, vtgateConn, ksName, "merchant", 3) query := "insert into merchant (mname, category) values('flipkart', 'electronics')" execVtgateQuery(t, vtgateConn, ksName, query) @@ -814,7 +818,8 @@ func reshardCustomer3to2SplitMerge(t *testing.T) { //-40,40-80,80-c0 => merge/sp t.Run("reshardCustomer3to2SplitMerge", func(t *testing.T) { ksName := "customer" counts := map[string]int{"zone1-1000": 8, "zone1-1100": 8, "zone1-1200": 5} - reshard(t, ksName, "customer", "c4c3", "-40,40-80,80-c0", "-60,60-c0", 1000, counts, nil, nil, "") + reshard(t, ksName, "customer", "c4c3", "-40,40-80,80-c0", "-60,60-c0", + 1000, counts, nil, nil, nil, "", 1) }) } @@ -822,11 +827,14 @@ func reshardCustomer3to1Merge(t *testing.T) { //to unsharded t.Run("reshardCustomer3to1Merge", func(t *testing.T) { ksName := "customer" counts := map[string]int{"zone1-1500": 21} - reshard(t, ksName, "customer", "c3c1", "-60,60-c0,c0-", "0", 1500, counts, nil, nil, "") + reshard(t, ksName, "customer", "c3c1", "-60,60-c0,c0-", "0", + 1500, counts, nil, nil, nil, "", 3) }) } -func reshard(t *testing.T, ksName string, tableName string, workflow string, sourceShards string, targetShards string, tabletIDBase int, counts map[string]int, dryRunResultSwitchWrites []string, cells []*Cell, sourceCellOrAlias string) { +func reshard(t *testing.T, ksName string, tableName string, workflow string, sourceShards string, targetShards string, + tabletIDBase int, counts map[string]int, dryRunResultSwitchReads, dryRunResultSwitchWrites []string, cells []*Cell, sourceCellOrAlias string, + autoIncrementStep int) { t.Run("reshard", func(t *testing.T) { if cells == nil { cells = []*Cell{defaultCell} @@ -844,7 +852,8 @@ func reshard(t *testing.T, ksName string, tableName string, workflow string, sou t.Fatal(err) } } - if err := vc.VtctlClient.ExecuteCommand("Reshard", "--", "--v1", "--cells="+sourceCellOrAlias, "--tablet_types=replica,primary", ksWorkflow, "--", sourceShards, targetShards); err != nil { + workflowType := "Reshard" + if err := vc.VtctlClient.ExecuteCommand(workflowType, "--", "--v1", "--cells="+sourceCellOrAlias, "--tablet_types=replica,primary", ksWorkflow, "--", sourceShards, targetShards); err != nil { t.Fatalf("Reshard command failed with %+v\n", err) } tablets := vc.getVttabletsInKeyspace(t, defaultCell, ksName, "primary") @@ -859,6 +868,9 @@ func reshard(t *testing.T, ksName string, tableName string, workflow string, sou } } vdiff1(t, ksWorkflow, "") + if dryRunResultSwitchReads != nil { + switchReadsDryRun(t, workflowType, allCellNames, ksWorkflow, dryRunResultSwitchReads) + } switchReads(t, allCellNames, ksWorkflow) if dryRunResultSwitchWrites != nil { switchWritesDryRun(t, ksWorkflow, dryRunResultSwitchWrites) @@ -1193,10 +1205,18 @@ func applyVSchema(t *testing.T, vschema, keyspace string) { require.NoError(t, err) } -func switchReadsDryRun(t *testing.T, cells, ksWorkflow string, dryRunResults []string) { - output, err := vc.VtctlClient.ExecuteCommandWithOutput("SwitchReads", "--", "--cells="+cells, "--tablet_types=replica", "--dry_run", ksWorkflow) - require.NoError(t, err, fmt.Sprintf("SwitchReads DryRun Error: %s: %s", err, output)) - validateDryRunResults(t, output, dryRunResults) +func switchReadsDryRun(t *testing.T, workflowType, cells, ksWorkflow string, dryRunResults []string) { + if workflowType != binlogdatapb.VReplicationWorkflowType_name[int32(binlogdatapb.VReplicationWorkflowType_MoveTables)] && + workflowType != binlogdatapb.VReplicationWorkflowType_name[int32(binlogdatapb.VReplicationWorkflowType_Reshard)] { + require.FailNowf(t, "Invalid workflow type for SwitchTraffic, must be MoveTables or Reshard", + "workflow type specified: %s", workflowType) + } + output, err := vc.VtctlClient.ExecuteCommandWithOutput(workflowType, "--", "--cells="+cells, "--tablet_types=rdonly,replica", + "--dry_run", "SwitchTraffic", ksWorkflow) + require.NoError(t, err, fmt.Sprintf("Switching Reads DryRun Error: %s: %s", err, output)) + if dryRunResults != nil { + validateDryRunResults(t, output, dryRunResults) + } } func switchReads(t *testing.T, cells, ksWorkflow string) { diff --git a/go/test/endtoend/vreplication/vreplication_test_env.go b/go/test/endtoend/vreplication/vreplication_test_env.go index 8329ae5e490..d13ac006daf 100644 --- a/go/test/endtoend/vreplication/vreplication_test_env.go +++ b/go/test/endtoend/vreplication/vreplication_test_env.go @@ -125,3 +125,9 @@ var dryRunResultsDropSourcesRenameCustomerShard = []string{ "Unlock keyspace customer", "Unlock keyspace product", } + +var dryRunResultsSwitchReadM2m3 = []string{ + "Lock keyspace merchant-type", + "Switch reads from keyspace merchant-type to keyspace merchant-type for shards -80,80- to shards -40,40-c0,c0-", + "Unlock keyspace merchant-type", +} diff --git a/go/test/endtoend/vreplication/vstream_test.go b/go/test/endtoend/vreplication/vstream_test.go index 6899e7c583b..d3b409d7cf4 100644 --- a/go/test/endtoend/vreplication/vstream_test.go +++ b/go/test/endtoend/vreplication/vstream_test.go @@ -369,7 +369,8 @@ func testVStreamStopOnReshardFlag(t *testing.T, stopOnReshard bool, baseTabletID tickCount++ switch tickCount { case 1: - reshard(t, "sharded", "customer", "vstreamStopOnReshard", "-80,80-", "-40,40-", baseTabletID+400, nil, nil, nil, defaultCellName) + reshard(t, "sharded", "customer", "vstreamStopOnReshard", "-80,80-", + "-40,40-", baseTabletID+400, nil, nil, nil, nil, defaultCellName, 1) case 60: done = true } @@ -499,7 +500,7 @@ func testVStreamCopyMultiKeyspaceReshard(t *testing.T, baseTabletID int) numEven tickCount++ switch tickCount { case 1: - reshard(t, "sharded", "customer", "vstreamCopyMultiKeyspaceReshard", "-80,80-", "-40,40-", baseTabletID+400, nil, nil, nil, defaultCellName) + reshard(t, "sharded", "customer", "vstreamCopyMultiKeyspaceReshard", "-80,80-", "-40,40-", baseTabletID+400, nil, nil, nil, nil, defaultCellName, 1) reshardDone = true case 60: done = true diff --git a/go/vt/wrangler/traffic_switcher.go b/go/vt/wrangler/traffic_switcher.go index 9a98dfe24b1..6dd04501d86 100644 --- a/go/vt/wrangler/traffic_switcher.go +++ b/go/vt/wrangler/traffic_switcher.go @@ -397,7 +397,7 @@ func (wr *Wrangler) SwitchReads(ctx context.Context, targetKeyspace, workflowNam return sw.logs(), nil } wr.Logger().Infof("About to switchShardReads: %+v, %+v, %+v", cells, servedTypes, direction) - if err := ts.switchShardReads(ctx, cells, servedTypes, direction); err != nil { + if err := sw.switchShardReads(ctx, cells, servedTypes, direction); err != nil { ts.Logger().Errorf("switchShardReads failed: %v", err) return nil, err }