diff --git a/go/test/endtoend/vreplication/helper_test.go b/go/test/endtoend/vreplication/helper_test.go index 264c748a5bb..44bd1e790d7 100644 --- a/go/test/endtoend/vreplication/helper_test.go +++ b/go/test/endtoend/vreplication/helper_test.go @@ -23,6 +23,7 @@ import ( "net/http" "os/exec" "regexp" + "sort" "strconv" "strings" "testing" @@ -503,3 +504,40 @@ func confirmWorkflowHasCopiedNoData(t *testing.T, targetKS, workflow string) { } } } + +// getShardRoutingRules returns the shard routing rules stored in the +// topo. It returns the rules sorted by shard,to_keyspace and with all +// newlines and whitespace removed so that we have predictable, +// compact, and easy to compare results for tests. +func getShardRoutingRules(t *testing.T) string { + output, err := osExec(t, "vtctldclient", []string{"--server", getVtctldGRPCURL(), "GetShardRoutingRules"}) + log.Infof("GetShardRoutingRules err: %+v, output: %+v", err, output) + require.Nilf(t, err, output) + require.NotNil(t, output) + + // Sort the rules by shard,to_keyspace + jsonOutput := gjson.Parse(output) + rules := jsonOutput.Get("rules").Array() + sort.Slice(rules, func(i, j int) bool { + shardI := rules[i].Get("shard").String() + shardJ := rules[j].Get("shard").String() + if shardI == shardJ { + return rules[i].Get("to_keyspace").String() < rules[j].Get("to_keyspace").String() + } + return shardI < shardJ + }) + sb := strings.Builder{} + for i := 0; i < len(rules); i++ { + if i > 0 { + sb.WriteString(",") + } + sb.WriteString(rules[i].String()) + } + output = fmt.Sprintf(`{"rules":[%s]}`, sb.String()) + + // Remove newlines and whitespace + re := regexp.MustCompile(`[\n\s]+`) + output = re.ReplaceAllString(output, "") + output = strings.TrimSpace(output) + return output +} diff --git a/go/test/endtoend/vreplication/partial_movetables_test.go b/go/test/endtoend/vreplication/partial_movetables_test.go new file mode 100644 index 00000000000..c130000e53a --- /dev/null +++ b/go/test/endtoend/vreplication/partial_movetables_test.go @@ -0,0 +1,275 @@ +/* +Copyright 2022 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vreplication + +import ( + "fmt" + "strings" + "testing" + + "github.com/stretchr/testify/require" + "github.com/tidwall/gjson" + + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/wrangler" +) + +// TestPartialMoveTables tests partial move tables by moving each +// customer shard -- -80,80- -- once a a time to customer2. +func TestPartialMoveTables(t *testing.T) { + origDefaultRdonly := defaultRdonly + defer func() { + defaultRdonly = origDefaultRdonly + }() + defaultRdonly = 1 + origExtraVTGateArgs := extraVTGateArgs + // We need to enable shard routing for partial movetables routing. + // And we need to disable schema change tracking in vtgate as we want + // to test query routing using a query we know will fail as it's + // using a column that doesn't exist in the schema -- this way we + // get the target shard details back in the error message. If schema + // tracking is enabled then vtgate will produce an error about the + // unknown symbol before attempting to route the query. + extraVTGateArgs = append(extraVTGateArgs, []string{ + "--enable-partial-keyspace-migration", + "--schema_change_signal=false", + }...) + defer func() { + extraVTGateArgs = origExtraVTGateArgs + }() + vc = setupCluster(t) + defer vtgateConn.Close() + defer vc.TearDown(t) + setupCustomerKeyspace(t) + + // Move customer table from unsharded product keyspace to + // sharded customer keyspace. + createMoveTablesWorkflow(t, "customer") + tstWorkflowSwitchReadsAndWrites(t) + tstWorkflowComplete(t) + + emptyGlobalRoutingRules := "{}\n" + + // These should be listed in shard order + emptyShardRoutingRules := `{"rules":[]}` + preCutoverShardRoutingRules := `{"rules":[{"from_keyspace":"customer2","to_keyspace":"customer","shard":"-80"},{"from_keyspace":"customer2","to_keyspace":"customer","shard":"80-"}]}` + halfCutoverShardRoutingRules := `{"rules":[{"from_keyspace":"customer2","to_keyspace":"customer","shard":"-80"},{"from_keyspace":"customer","to_keyspace":"customer2","shard":"80-"}]}` + postCutoverShardRoutingRules := `{"rules":[{"from_keyspace":"customer","to_keyspace":"customer2","shard":"-80"},{"from_keyspace":"customer","to_keyspace":"customer2","shard":"80-"}]}` + + // Remove any manually applied shard routing rules as these + // should be set by SwitchTraffic. + applyShardRoutingRules(t, emptyShardRoutingRules) + require.Equal(t, emptyShardRoutingRules, getShardRoutingRules(t)) + + // Now setup the customer2 keyspace so we can do a partial + // move tables for one of the two shards: 80-. + defaultRdonly = 0 + setupCustomer2Keyspace(t) + currentWorkflowType = wrangler.MoveTablesWorkflow + wfName := "partial80Dash" + sourceKs := "customer" + targetKs := "customer2" + shard := "80-" + ksWf := fmt.Sprintf("%s.%s", targetKs, wfName) + + // start the partial movetables for 80- + err := tstWorkflowExec(t, defaultCellName, wfName, sourceKs, targetKs, + "customer", workflowActionCreate, "", shard, "") + require.NoError(t, err) + targetTab1 = vc.getPrimaryTablet(t, targetKs, shard) + catchup(t, targetTab1, wfName, "Partial MoveTables Customer to Customer2") + vdiff1(t, ksWf, "") + + waitForRowCount(t, vtgateConn, "customer", "customer", 3) // customer: all shards + waitForRowCount(t, vtgateConn, "customer2", "customer", 3) // customer2: all shards + waitForRowCount(t, vtgateConn, "customer2:80-", "customer", 2) // customer2: 80- + + confirmGlobalRoutingToSource := func() { + output, err := vc.VtctlClient.ExecuteCommandWithOutput("GetRoutingRules") + require.NoError(t, err) + result := gjson.Get(output, "rules") + result.ForEach(func(attributeKey, attributeValue gjson.Result) bool { + // 0 is the keyspace and 1 is optional tablename[@tablettype] + fromKsTbl := strings.Split(attributeValue.Get("fromTable").String(), ".") + // 0 is the keyspace and 1 is the tablename + toKsTbl := strings.Split(attributeValue.Get("toTables.0").String(), ".") + // All tables in the customer and customer2 keyspaces should be + // routed to the customer keyspace. + if fromKsTbl[0] == "customer" || fromKsTbl[0] == "customer2" { + require.Equal(t, "customer", toKsTbl[0]) + } + return true + }) + } + + // This query uses an ID that should always get routed to shard 80- + shard80MinusRoutedQuery := "select name from customer where cid = 1 and noexistcol = 'foo'" + // This query uses an ID that should always get routed to shard -80 + shardMinus80RoutedQuery := "select name from customer where cid = 2 and noexistcol = 'foo'" + + // reset any existing vtgate connection state + vtgateConn.Close() + vtgateConn = getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) + defer vtgateConn.Close() + + // Global routing rules should be in place with everything going to + // the source keyspace (customer). + confirmGlobalRoutingToSource() + + // Shard routing rules should now also be in place with everything + // going to the source keyspace (customer). + require.Equal(t, preCutoverShardRoutingRules, getShardRoutingRules(t)) + + // Confirm shard targeting works before we switch any traffic. + // Everything should be routed to the source keyspace (customer). + + log.Infof("Testing reverse route (target->source) for shard being switched") + _, err = vtgateConn.ExecuteFetch("use `customer2:80-`", 0, false) + require.NoError(t, err) + _, err = vtgateConn.ExecuteFetch(shard80MinusRoutedQuery, 0, false) + require.Error(t, err) + require.Contains(t, err.Error(), "target: customer.80-.primary", "Query was routed to the target before any SwitchTraffic") + + log.Infof("Testing reverse route (target->source) for shard NOT being switched") + _, err = vtgateConn.ExecuteFetch("use `customer2:-80`", 0, false) + require.NoError(t, err) + _, err = vtgateConn.ExecuteFetch(shardMinus80RoutedQuery, 0, false) + require.Error(t, err) + require.Contains(t, err.Error(), "target: customer.-80.primary", "Query was routed to the target before any SwitchTraffic") + + // Switch all traffic for the shard + require.NoError(t, tstWorkflowExec(t, "", wfName, "", targetKs, "", workflowActionSwitchTraffic, "", "", "")) + expectedSwitchOutput := fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s\nStart State: Reads Not Switched. Writes Not Switched\nCurrent State: Reads partially switched, for shards: %s. Writes partially switched, for shards: %s\n\n", + targetKs, wfName, shard, shard) + require.Equal(t, expectedSwitchOutput, lastOutput) + + // Confirm global routing rules -- everything should still be routed + // to the source side, customer, globally. + confirmGlobalRoutingToSource() + + // Confirm shard routing rules -- all traffic for the 80- shard should be + // routed into the customer2 keyspace, overriding the global routing rules. + require.Equal(t, halfCutoverShardRoutingRules, getShardRoutingRules(t)) + + // reset any existing vtgate connection state + vtgateConn.Close() + vtgateConn = getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) + defer vtgateConn.Close() + + // No shard targeting + _, err = vtgateConn.ExecuteFetch(shard80MinusRoutedQuery, 0, false) + require.Error(t, err) + require.Contains(t, err.Error(), "target: customer2.80-.primary", "Query was routed to the source after partial SwitchTraffic") + _, err = vtgateConn.ExecuteFetch(shardMinus80RoutedQuery, 0, false) + require.Error(t, err) + require.Contains(t, err.Error(), "target: customer.-80.primary", "Query was routed to the target before partial SwitchTraffic") + + // Shard targeting + _, err = vtgateConn.ExecuteFetch("use `customer2:80-`", 0, false) + require.NoError(t, err) + _, err = vtgateConn.ExecuteFetch(shard80MinusRoutedQuery, 0, false) + require.Error(t, err) + require.Contains(t, err.Error(), "target: customer2.80-.primary", "Query was routed to the source after partial SwitchTraffic") + _, err = vtgateConn.ExecuteFetch("use `customer:80-`", 0, false) + require.NoError(t, err) + _, err = vtgateConn.ExecuteFetch(shard80MinusRoutedQuery, 0, false) + require.Error(t, err) + require.Contains(t, err.Error(), "target: customer2.80-.primary", "Query was routed to the source after partial SwitchTraffic") + + // Tablet type targeting + _, err = vtgateConn.ExecuteFetch("use `customer2@replica`", 0, false) + require.NoError(t, err) + _, err = vtgateConn.ExecuteFetch(shard80MinusRoutedQuery, 0, false) + require.Error(t, err) + require.Contains(t, err.Error(), "target: customer2.80-.replica", "Query was routed to the source after partial SwitchTraffic") + _, err = vtgateConn.ExecuteFetch(shardMinus80RoutedQuery, 0, false) + require.Error(t, err) + require.Contains(t, err.Error(), "target: customer.-80.replica", "Query was routed to the target before partial SwitchTraffic") + _, err = vtgateConn.ExecuteFetch("use `customer@replica`", 0, false) + require.NoError(t, err) + _, err = vtgateConn.ExecuteFetch(shard80MinusRoutedQuery, 0, false) + require.Error(t, err) + require.Contains(t, err.Error(), "target: customer2.80-.replica", "Query was routed to the source after partial SwitchTraffic") + _, err = vtgateConn.ExecuteFetch(shardMinus80RoutedQuery, 0, false) + require.Error(t, err) + require.Contains(t, err.Error(), "target: customer.-80.replica", "Query was routed to the target before partial SwitchTraffic") + + // We cannot Complete a partial move tables at the moment because + // it will find that all traffic has (obviously) not been switched. + err = tstWorkflowExec(t, "", wfName, "", targetKs, "", workflowActionComplete, "", "", "") + require.Error(t, err) + + // Confirm global routing rules: -80 should still be be routed to customer + // while 80- should be routed to customer2. + require.Equal(t, halfCutoverShardRoutingRules, getShardRoutingRules(t)) + + // Now move the other shard: -80 + wfName = "partialDash80" + shard = "-80" + ksWf = fmt.Sprintf("%s.%s", targetKs, wfName) + + // Start the partial movetables for -80, 80- has already been switched + err = tstWorkflowExec(t, defaultCellName, wfName, sourceKs, targetKs, + "customer", workflowActionCreate, "", shard, "") + require.NoError(t, err) + targetTab2 := vc.getPrimaryTablet(t, targetKs, shard) + catchup(t, targetTab2, wfName, "Partial MoveTables Customer to Customer2: -80") + vdiff1(t, ksWf, "") + // Switch all traffic for the shard + require.NoError(t, tstWorkflowExec(t, "", wfName, "", targetKs, "", workflowActionSwitchTraffic, "", "", "")) + expectedSwitchOutput = fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s\nStart State: Reads partially switched, for shards: 80-. Writes partially switched, for shards: 80-\nCurrent State: All Reads Switched. All Writes Switched\n\n", + targetKs, wfName) + require.Equal(t, expectedSwitchOutput, lastOutput) + + // Confirm global routing rules: everything should still be routed + // to the source side, customer, globally. + confirmGlobalRoutingToSource() + + // Confirm shard routing rules: all shards should be routed to the + // target side (customer2). + require.Equal(t, postCutoverShardRoutingRules, getShardRoutingRules(t)) + + // Cancel both reverse workflows (as we've done the cutover), which should + // clean up both the global routing rules and the shard routing rules. + for _, wf := range []string{"partialDash80", "partial80Dash"} { + // We switched traffic, so it's the reverse workflow we want to cancel. + reverseWf := wf + "_reverse" + reverseKs := sourceKs // customer + err = tstWorkflowExec(t, "", reverseWf, "", reverseKs, "", workflowActionCancel, "", "", "") + require.NoError(t, err) + + output, err := vc.VtctlClient.ExecuteCommandWithOutput("Workflow", fmt.Sprintf("%s.%s", reverseKs, reverseWf), "show") + require.Error(t, err) + require.Contains(t, output, "no streams found") + + // Delete the original workflow + originalKsWf := fmt.Sprintf("%s.%s", targetKs, wf) + _, err = vc.VtctlClient.ExecuteCommandWithOutput("Workflow", originalKsWf, "delete") + require.NoError(t, err) + output, err = vc.VtctlClient.ExecuteCommandWithOutput("Workflow", originalKsWf, "show") + require.Error(t, err) + require.Contains(t, output, "no streams found") + } + + // Confirm that the global routing rules are now gone. + output, err := vc.VtctlClient.ExecuteCommandWithOutput("GetRoutingRules") + require.NoError(t, err) + require.Equal(t, emptyGlobalRoutingRules, output) + + // Confirm that the shard routing rules are now gone. + require.Equal(t, emptyShardRoutingRules, getShardRoutingRules(t)) +} diff --git a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go index 7b085a9321b..f7865dc5ad7 100644 --- a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go +++ b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go @@ -19,14 +19,12 @@ package vreplication import ( "fmt" "net" - "regexp" "strconv" "strings" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/tidwall/gjson" "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/vt/log" @@ -259,152 +257,6 @@ func TestBasicV2Workflows(t *testing.T) { log.Flush() } -// TestPartialMoveTables tests partial move tables by moving just one shard -// 80- from customer to customer2. -func TestPartialMoveTables(t *testing.T) { - defaultRdonly = 1 - origExtraVTGateArgs := extraVTGateArgs - // We need to enable shard routing for partial movetables routing. - // And we need to disable schema change tracking in vtgate as we want - // to test query routing using a query we know will fail as it's - // using a column that doesn't exist in the schema -- this way we - // get the target shard details back in the error message. If schema - // tracking is enabled then vtgate will produce an error about the - // unknown symbol before attempting to route the query. - extraVTGateArgs = append(extraVTGateArgs, []string{ - "--enable-partial-keyspace-migration", - "--schema_change_signal=false", - }...) - defer func() { - extraVTGateArgs = origExtraVTGateArgs - }() - vc = setupCluster(t) - defer vtgateConn.Close() - defer vc.TearDown(t) - setupCustomerKeyspace(t) - - // Move customer table from unsharded product keyspace to - // sharded customer keyspace. - createMoveTablesWorkflow(t, "customer") - tstWorkflowSwitchReadsAndWrites(t) - tstWorkflowComplete(t) - - // Now setup the customer2 keyspace so we can do a partial - // move tables for one of the two shards: 80-. - defaultRdonly = 0 - setupCustomer2Keyspace(t) - currentWorkflowType = wrangler.MoveTablesWorkflow - wfName := "partial" - moveToKs := "customer2" - shard := "80-" - ksWf := fmt.Sprintf("%s.%s", moveToKs, wfName) - err := tstWorkflowExec(t, defaultCellName, wfName, targetKs, moveToKs, - "customer", workflowActionCreate, "", shard, "") - require.NoError(t, err) - targetTab1 = vc.getPrimaryTablet(t, moveToKs, shard) - catchup(t, targetTab1, wfName, "Partial MoveTables Customer to Customer2") - vdiff1(t, ksWf, "") - - waitForRowCount(t, vtgateConn, "customer", "customer", 3) // customer: all shards - waitForRowCount(t, vtgateConn, "customer2", "customer", 3) // customer: all shards - waitForRowCount(t, vtgateConn, "customer2:80-", "customer", 2) // customer2: 80- - - // Remove any manually applied shard routing rules as these - // should be set by SwitchTraffic. - emptyRules := `{"rules":[]}` - applyShardRoutingRules(t, emptyRules) - require.Equal(t, emptyRules, getShardRoutingRules(t)) - - // switch all traffic - require.NoError(t, tstWorkflowExec(t, "", wfName, "", moveToKs, "", workflowActionSwitchTraffic, "", "", "")) - expectedSwitchOutput := fmt.Sprintf("SwitchTraffic was successful for workflow customer2.partial\nStart State: Reads Not Switched. Writes Not Switched\nCurrent State: Reads partially switched, for shards: %s. Writes partially switched, for shards: %s\n\n", - shard, shard) - require.Equal(t, expectedSwitchOutput, lastOutput) - - // Confirm global routing rules -- everything should still be routed - // to the source side, customer, globally. - output, err := vc.VtctlClient.ExecuteCommandWithOutput("GetRoutingRules") - require.NoError(t, err) - result := gjson.Get(output, "rules") - result.ForEach(func(attributeKey, attributeValue gjson.Result) bool { - // 0 is the keyspace and 1 is optional tablename[@tablettype] - fromKsTbl := strings.Split(attributeValue.Get("fromTable").String(), ".") - // 0 is the keyspace and 1 is the tablename - toKsTbl := strings.Split(attributeValue.Get("toTables.0").String(), ".") - // All tables in the customer and customer2 keyspaces should be - // routed to the customer keyspace. - if fromKsTbl[0] == "customer" || fromKsTbl[0] == "customer2" { - require.Equal(t, "customer", toKsTbl[0]) - } - return true - }) - // Confirm shard routing rules -- all traffic for the 80- shard should be - // routed into the customer2 keyspace, overriding the global routing rules. - expectedShardRoutingRules := `{"rules":[{"from_keyspace":"customer","to_keyspace":"customer2","shard":"80-"}]}` - require.Equal(t, expectedShardRoutingRules, getShardRoutingRules(t)) - - // This query uses an ID that should always get routed to customer2:80- - targetRoutedQuery := "select name from customer where cid = 1 and noexistcol = 'foo'" - // This query uses an ID that should always get routed to customer:-80 - sourceRoutedQuery := "select name from customer where cid = 2 and noexistcol = 'foo'" - - // reset any existing vtgate connection state - vtgateConn.Close() - vtgateConn = getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) - defer vtgateConn.Close() - - // No shard targeting - _, err = vtgateConn.ExecuteFetch(targetRoutedQuery, 0, false) - require.Error(t, err) - require.Contains(t, err.Error(), "target: customer2.80-.primary") - _, err = vtgateConn.ExecuteFetch(sourceRoutedQuery, 0, false) - require.Error(t, err) - require.Contains(t, err.Error(), "target: customer.-80.primary") - - // Shard targeting - _, err = vtgateConn.ExecuteFetch("use `customer2:80-`", 0, false) - require.NoError(t, err) - _, err = vtgateConn.ExecuteFetch(targetRoutedQuery, 0, false) - require.Error(t, err) - require.Contains(t, err.Error(), "target: customer2.80-.primary") - _, err = vtgateConn.ExecuteFetch("use `customer:80-`", 0, false) - require.NoError(t, err) - _, err = vtgateConn.ExecuteFetch(targetRoutedQuery, 0, false) - require.Error(t, err) - require.Contains(t, err.Error(), "target: customer2.80-.primary") - - // Tablet type targeting - _, err = vtgateConn.ExecuteFetch("use `customer2@replica`", 0, false) - require.NoError(t, err) - _, err = vtgateConn.ExecuteFetch(targetRoutedQuery, 0, false) - require.Error(t, err) - require.Contains(t, err.Error(), "target: customer2.80-.replica") - _, err = vtgateConn.ExecuteFetch(sourceRoutedQuery, 0, false) - require.Error(t, err) - require.Contains(t, err.Error(), "target: customer.-80.replica") - _, err = vtgateConn.ExecuteFetch("use `customer@replica`", 0, false) - require.NoError(t, err) - _, err = vtgateConn.ExecuteFetch(targetRoutedQuery, 0, false) - require.Error(t, err) - require.Contains(t, err.Error(), "target: customer2.80-.replica") - _, err = vtgateConn.ExecuteFetch(sourceRoutedQuery, 0, false) - require.Error(t, err) - require.Contains(t, err.Error(), "target: customer.-80.replica") - - // We cannot Complete a partial move tables at the moment because it will - // find that all traffic has (obviously) not been switched we need to - // cleanup using Workflow delete. - err = tstWorkflowExec(t, "", wfName, "", moveToKs, "", workflowActionComplete, "", "", "") - require.Error(t, err) - require.Equal(t, expectedShardRoutingRules, getShardRoutingRules(t)) - _, err = vc.VtctlClient.ExecuteCommandWithOutput("Workflow", ksWf, "delete") - require.NoError(t, err) - output, err = vc.VtctlClient.ExecuteCommandWithOutput("Workflow", ksWf, "show") - require.Error(t, err) - require.Contains(t, output, "no streams found") - -} - func getVtctldGRPCURL() string { return net.JoinHostPort("localhost", strconv.Itoa(vc.Vtctld.GrpcPort)) } @@ -416,17 +268,6 @@ func applyShardRoutingRules(t *testing.T, rules string) { require.NotNil(t, output) } -func getShardRoutingRules(t *testing.T) string { - output, err := osExec(t, "vtctldclient", []string{"--server", getVtctldGRPCURL(), "GetShardRoutingRules"}) - log.Infof("GetShardRoutingRules err: %+v, output: %+v", err, output) - require.Nilf(t, err, output) - require.NotNil(t, output) - re := regexp.MustCompile(`[\n\s]+`) - output = re.ReplaceAllString(output, "") - output = strings.TrimSpace(output) - return output -} - /* testVSchemaForSequenceAfterMoveTables checks that the related sequence tag is migrated correctly in the vschema while moving a table with an auto-increment from sharded to unsharded. diff --git a/go/vt/vtctl/workflow/state.go b/go/vt/vtctl/workflow/state.go index 2841cd98a1a..613f82d0b43 100644 --- a/go/vt/vtctl/workflow/state.go +++ b/go/vt/vtctl/workflow/state.go @@ -41,5 +41,7 @@ type State struct { WritesSwitched bool // Partial MoveTables info - WritesPartiallySwitched bool + IsPartialMigration bool + ShardsAlreadySwitched []string + ShardsNotYetSwitched []string } diff --git a/go/vt/wrangler/materializer.go b/go/vt/wrangler/materializer.go index c5579fc7579..fdd4b2c3007 100644 --- a/go/vt/wrangler/materializer.go +++ b/go/vt/wrangler/materializer.go @@ -227,6 +227,7 @@ func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, ta if err := topotools.SaveRoutingRules(ctx, wr.ts, rules); err != nil { return err } + if vschema != nil { // We added to the vschema. if err := wr.ts.SaveVSchema(ctx, targetKeyspace, vschema); err != nil { @@ -891,6 +892,39 @@ func getMigrationID(targetKeyspace string, shardTablets []string) (int64, error) return int64(hasher.Sum64() & math.MaxInt64), nil } +// createDefaultShardRoutingRules creates a reverse routing rule for +// each shard in a new partial keyspace migration workflow that does +// not already have an existing routing rule in place. +func (wr *Wrangler) createDefaultShardRoutingRules(ctx context.Context, ms *vtctldatapb.MaterializeSettings) error { + srr, err := topotools.GetShardRoutingRules(ctx, wr.ts) + if err != nil { + return err + } + allShards, err := wr.sourceTs.GetServingShards(ctx, ms.SourceKeyspace) + if err != nil { + return err + } + changed := false + for _, si := range allShards { + fromSource := fmt.Sprintf("%s.%s", ms.SourceKeyspace, si.ShardName()) + fromTarget := fmt.Sprintf("%s.%s", ms.TargetKeyspace, si.ShardName()) + if srr[fromSource] == "" && srr[fromTarget] == "" { + srr[fromTarget] = ms.SourceKeyspace + changed = true + wr.Logger().Infof("Added default shard routing rule from %q to %q", fromTarget, fromSource) + } + } + if changed { + if err := topotools.SaveShardRoutingRules(ctx, wr.ts, srr); err != nil { + return err + } + if err := wr.ts.RebuildSrvVSchema(ctx, nil); err != nil { + return err + } + } + return nil +} + func (wr *Wrangler) prepareMaterializerStreams(ctx context.Context, ms *vtctldatapb.MaterializeSettings) (*materializer, error) { if err := wr.validateNewWorkflow(ctx, ms.TargetKeyspace, ms.Workflow); err != nil { return nil, err @@ -899,6 +933,11 @@ func (wr *Wrangler) prepareMaterializerStreams(ctx context.Context, ms *vtctldat if err != nil { return nil, err } + if mz.isPartial { + if err := wr.createDefaultShardRoutingRules(ctx, ms); err != nil { + return nil, err + } + } if err := mz.deploySchema(ctx); err != nil { return nil, err } diff --git a/go/vt/wrangler/switcher.go b/go/vt/wrangler/switcher.go index 27ff564dd17..e46b62c57e8 100644 --- a/go/vt/wrangler/switcher.go +++ b/go/vt/wrangler/switcher.go @@ -40,6 +40,10 @@ func (r *switcher) deleteRoutingRules(ctx context.Context) error { return r.ts.deleteRoutingRules(ctx) } +func (r *switcher) deleteShardRoutingRules(ctx context.Context) error { + return r.ts.deleteShardRoutingRules(ctx) +} + func (r *switcher) dropSourceDeniedTables(ctx context.Context) error { return r.ts.dropSourceDeniedTables(ctx) } diff --git a/go/vt/wrangler/switcher_dry_run.go b/go/vt/wrangler/switcher_dry_run.go index 832f5f1917f..4c1114d6c9d 100644 --- a/go/vt/wrangler/switcher_dry_run.go +++ b/go/vt/wrangler/switcher_dry_run.go @@ -47,6 +47,13 @@ func (dr *switcherDryRun) deleteRoutingRules(ctx context.Context) error { return nil } +func (dr *switcherDryRun) deleteShardRoutingRules(ctx context.Context) error { + if dr.ts.isPartialMigration { + dr.drLog.Log("Shard routing rules for participating shards will be deleted") + } + return nil +} + func (dr *switcherDryRun) switchShardReads(ctx context.Context, cells []string, servedTypes []topodatapb.TabletType, direction workflow.TrafficSwitchDirection) error { sourceShards := make([]string, 0) targetShards := make([]string, 0) diff --git a/go/vt/wrangler/switcher_interface.go b/go/vt/wrangler/switcher_interface.go index 2e400b69ac7..26bd5f53a63 100644 --- a/go/vt/wrangler/switcher_interface.go +++ b/go/vt/wrangler/switcher_interface.go @@ -50,6 +50,7 @@ type iswitcher interface { removeTargetTables(ctx context.Context) error dropTargetShards(ctx context.Context) error deleteRoutingRules(ctx context.Context) error + deleteShardRoutingRules(ctx context.Context) error addParticipatingTablesToKeyspace(ctx context.Context, keyspace, tableSpecs string) error logs() *[]string } diff --git a/go/vt/wrangler/traffic_switcher.go b/go/vt/wrangler/traffic_switcher.go index 9a07150c117..79a967121ae 100644 --- a/go/vt/wrangler/traffic_switcher.go +++ b/go/vt/wrangler/traffic_switcher.go @@ -211,9 +211,10 @@ func (wr *Wrangler) getWorkflowState(ctx context.Context, targetKeyspace, workfl ws := workflow.NewServer(wr.ts, wr.tmc) state := &workflow.State{ - Workflow: workflowName, - SourceKeyspace: ts.SourceKeyspaceName(), - TargetKeyspace: targetKeyspace, + Workflow: workflowName, + SourceKeyspace: ts.SourceKeyspaceName(), + TargetKeyspace: targetKeyspace, + IsPartialMigration: ts.isPartialMigration, } var ( @@ -221,9 +222,11 @@ func (wr *Wrangler) getWorkflowState(ctx context.Context, targetKeyspace, workfl keyspace string ) - // we reverse writes by using the source_keyspace.workflowname_reverse workflow spec, so we need to use the - // source of the reverse workflow, which is the target of the workflow initiated by the user for checking routing rules - // Similarly we use a target shard of the reverse workflow as the original source to check if writes have been switched + // We reverse writes by using the source_keyspace.workflowname_reverse workflow + // spec, so we need to use the source of the reverse workflow, which is the + // target of the workflow initiated by the user for checking routing rules. + // Similarly we use a target shard of the reverse workflow as the original + // source to check if writes have been switched. if strings.HasSuffix(workflowName, "_reverse") { reverse = true keyspace = state.SourceKeyspace @@ -234,7 +237,7 @@ func (wr *Wrangler) getWorkflowState(ctx context.Context, targetKeyspace, workfl if ts.MigrationType() == binlogdatapb.MigrationType_TABLES { state.WorkflowType = workflow.TypeMoveTables - // we assume a consistent state, so only choose routing rule for one table for replica/rdonly + // We assume a consistent state, so only choose routing rule for one table. if len(ts.Tables()) == 0 { return nil, nil, fmt.Errorf("no tables in workflow %s.%s", keyspace, workflowName) @@ -242,14 +245,17 @@ func (wr *Wrangler) getWorkflowState(ctx context.Context, targetKeyspace, workfl table := ts.Tables()[0] if ts.isPartialMigration { // shard level traffic switching is all or nothing - shardRules, err := topotools.GetShardRoutingRules(ctx, ts.TopoServer()) + shardRoutingRules, err := wr.ts.GetShardRoutingRules(ctx) if err != nil { return nil, nil, err } - for _, sourceShard := range ts.SourceShards() { - if _, ok := shardRules[fmt.Sprintf("%s.%s", ts.sourceKeyspace, sourceShard.ShardName())]; ok { - state.WritesPartiallySwitched = true // and in effect reads are too - break + + rules := shardRoutingRules.Rules + for _, rule := range rules { + if rule.ToKeyspace == ts.SourceKeyspaceName() { + state.ShardsNotYetSwitched = append(state.ShardsNotYetSwitched, rule.Shard) + } else { + state.ShardsAlreadySwitched = append(state.ShardsAlreadySwitched, rule.Shard) } } } else { @@ -696,6 +702,9 @@ func (wr *Wrangler) dropArtifacts(ctx context.Context, keepRoutingRules bool, sw if err := sw.deleteRoutingRules(ctx); err != nil { return err } + if err := sw.deleteShardRoutingRules(ctx); err != nil { + return err + } } return nil @@ -1517,6 +1526,23 @@ func (ts *trafficSwitcher) changeShardRouting(ctx context.Context) error { return nil } +func (ts *trafficSwitcher) deleteShardRoutingRules(ctx context.Context) error { + if !ts.isPartialMigration { + return nil + } + srr, err := topotools.GetShardRoutingRules(ctx, ts.TopoServer()) + if err != nil { + return err + } + for _, si := range ts.TargetShards() { + delete(srr, fmt.Sprintf("%s.%s", ts.targetKeyspace, si.ShardName())) + } + if err := topotools.SaveShardRoutingRules(ctx, ts.TopoServer(), srr); err != nil { + return err + } + return nil +} + func (ts *trafficSwitcher) startReverseVReplication(ctx context.Context) error { return ts.ForAllSources(func(source *workflow.MigrationSource) error { query := fmt.Sprintf("update _vt.vreplication set state='Running', message='' where db_name=%s", encodeString(source.GetPrimary().DbName())) diff --git a/go/vt/wrangler/workflow.go b/go/vt/wrangler/workflow.go index 4a4ca50518a..468e82f080a 100644 --- a/go/vt/wrangler/workflow.go +++ b/go/vt/wrangler/workflow.go @@ -144,7 +144,7 @@ func (vrw *VReplicationWorkflow) stateAsString(ws *workflow.State) string { if !vrw.Exists() { stateInfo = append(stateInfo, WorkflowStateNotCreated) } else { - if !vrw.ts.isPartialMigration { // shard level traffic switching is all or nothing + if !ws.IsPartialMigration { // shard level traffic switching is all or nothing if len(ws.RdonlyCellsNotSwitched) == 0 && len(ws.ReplicaCellsNotSwitched) == 0 && len(ws.ReplicaCellsSwitched) > 0 { s = "All Reads Switched" } else if len(ws.RdonlyCellsSwitched) == 0 && len(ws.ReplicaCellsSwitched) == 0 { @@ -172,21 +172,21 @@ func (vrw *VReplicationWorkflow) stateAsString(ws *workflow.State) string { } if ws.WritesSwitched { stateInfo = append(stateInfo, "Writes Switched") - } else if vrw.ts.isPartialMigration { - if ws.WritesPartiallySwitched { - // For partial migrations, the traffic switching is all or nothing - // at the shard level, so reads are effectively switched on the - // shard when writes are switched. - sourceShards := vrw.ts.SourceShards() - switchedShards := make([]string, len(sourceShards)) - for i, sourceShard := range sourceShards { - switchedShards[i] = sourceShard.ShardName() - } - stateInfo = append(stateInfo, fmt.Sprintf("Reads partially switched, for shards: %s", strings.Join(switchedShards, ","))) - stateInfo = append(stateInfo, fmt.Sprintf("Writes partially switched, for shards: %s", strings.Join(switchedShards, ","))) + } else if ws.IsPartialMigration { + // For partial migrations, the traffic switching is all or nothing + // at the shard level, so reads are effectively switched on the + // shard when writes are switched. + if len(ws.ShardsAlreadySwitched) > 0 && len(ws.ShardsNotYetSwitched) > 0 { + stateInfo = append(stateInfo, fmt.Sprintf("Reads partially switched, for shards: %s", strings.Join(ws.ShardsAlreadySwitched, ","))) + stateInfo = append(stateInfo, fmt.Sprintf("Writes partially switched, for shards: %s", strings.Join(ws.ShardsAlreadySwitched, ","))) } else { - stateInfo = append(stateInfo, "Reads Not Switched") - stateInfo = append(stateInfo, "Writes Not Switched") + if len(ws.ShardsAlreadySwitched) == 0 { + stateInfo = append(stateInfo, "Reads Not Switched") + stateInfo = append(stateInfo, "Writes Not Switched") + } else { + stateInfo = append(stateInfo, "All Reads Switched") + stateInfo = append(stateInfo, "All Writes Switched") + } } } else { stateInfo = append(stateInfo, "Writes Not Switched")