diff --git a/go/vt/workflow/resharding/tasks.go b/go/vt/workflow/resharding/tasks.go index c5fc64aa357..7ad4d04ad2f 100644 --- a/go/vt/workflow/resharding/tasks.go +++ b/go/vt/workflow/resharding/tasks.go @@ -61,7 +61,8 @@ func (hw *horizontalReshardingWorkflow) runCopySchema(ctx context.Context, t *wo keyspace := t.Attributes["keyspace"] sourceShard := t.Attributes["source_shard"] destShard := t.Attributes["destination_shard"] - return hw.wr.CopySchemaShardFromShard(ctx, nil /* tableArray*/, nil /* excludeTableArray */, true, /*includeViews*/ + excludeTables := strings.Split(t.Attributes["exclude_tables"], ",") + return hw.wr.CopySchemaShardFromShard(ctx, nil /* tableArray*/, excludeTables /* excludeTableArray */, true, /*includeViews*/ keyspace, sourceShard, keyspace, destShard, wrangler.DefaultWaitSlaveTimeout) } @@ -74,6 +75,7 @@ func (hw *horizontalReshardingWorkflow) runSplitClone(ctx context.Context, t *wo useConsistentSnapshot := t.Attributes["use_consistent_snapshot"] sourceKeyspaceShard := topoproto.KeyspaceShardString(keyspace, sourceShard) + excludeTables := t.Attributes["exclude_tables"] // Reset the vtworker to avoid error if vtworker command has been called elsewhere. // This is because vtworker class doesn't cleanup the environment after execution. if _, err := automation.ExecuteVtworker(ctx, worker, []string{"Reset"}); err != nil { @@ -84,6 +86,11 @@ func (hw *horizontalReshardingWorkflow) runSplitClone(ctx context.Context, t *wo if useConsistentSnapshot != "" { args = append(args, "--use_consistent_snapshot") } + + if excludeTables != "" { + args = append(args, "--exclude_tables="+excludeTables) + } + _, err := automation.ExecuteVtworker(hw.ctx, worker, args) return err } @@ -100,6 +107,7 @@ func (hw *horizontalReshardingWorkflow) runSplitDiff(ctx context.Context, t *wor destinationTabletType := t.Attributes["dest_tablet_type"] worker := t.Attributes["vtworker"] useConsistentSnapshot := t.Attributes["use_consistent_snapshot"] + excludeTables := t.Attributes["exclude_tables"] if _, err := automation.ExecuteVtworker(hw.ctx, worker, []string{"Reset"}); err != nil { return err @@ -108,6 +116,11 @@ func (hw *horizontalReshardingWorkflow) runSplitDiff(ctx context.Context, t *wor if useConsistentSnapshot != "" { args = append(args, "--use_consistent_snapshot") } + + if excludeTables != "" { + args = append(args, "--exclude_tables="+excludeTables) + } + _, err := automation.ExecuteVtworker(ctx, worker, args) return err } diff --git a/go/vt/workflow/resharding/workflow.go b/go/vt/workflow/resharding/workflow.go index dedd4f7989b..8f714afb690 100644 --- a/go/vt/workflow/resharding/workflow.go +++ b/go/vt/workflow/resharding/workflow.go @@ -68,6 +68,7 @@ func (*Factory) Init(m *workflow.Manager, w *workflowpb.Workflow, args []string) subFlags := flag.NewFlagSet(horizontalReshardingFactoryName, flag.ContinueOnError) keyspace := subFlags.String("keyspace", "", "Name of keyspace to perform horizontal resharding") vtworkersStr := subFlags.String("vtworkers", "", "A comma-separated list of vtworker addresses") + excludeTablesStr := subFlags.String("exclude_tables", "", "A comma-separated list of tables to exclude") sourceShardsStr := subFlags.String("source_shards", "", "A comma-separated list of source shards") destinationShardsStr := subFlags.String("destination_shards", "", "A comma-separated list of destination shards") minHealthyRdonlyTablets := subFlags.String("min_healthy_rdonly_tablets", "1", "Minimum number of healthy RDONLY tablets required in source shards") @@ -85,6 +86,7 @@ func (*Factory) Init(m *workflow.Manager, w *workflowpb.Workflow, args []string) } vtworkers := strings.Split(*vtworkersStr, ",") + excludeTables := strings.Split(*excludeTablesStr, ",") sourceShards := strings.Split(*sourceShardsStr, ",") destinationShards := strings.Split(*destinationShardsStr, ",") phaseEnableApprovals := parsePhaseEnableApprovals(*phaseEnableApprovalsStr) @@ -110,7 +112,7 @@ func (*Factory) Init(m *workflow.Manager, w *workflowpb.Workflow, args []string) } w.Name = fmt.Sprintf("Reshard shards %v into shards %v of keyspace %v.", *keyspace, *sourceShardsStr, *destinationShardsStr) - checkpoint, err := initCheckpoint(*keyspace, vtworkers, sourceShards, destinationShards, *minHealthyRdonlyTablets, *splitCmd, *splitDiffDestTabletType, useConsistentSnapshotArg) + checkpoint, err := initCheckpoint(*keyspace, vtworkers, excludeTables, sourceShards, destinationShards, *minHealthyRdonlyTablets, *splitCmd, *splitDiffDestTabletType, useConsistentSnapshotArg) if err != nil { return err } @@ -269,13 +271,14 @@ func validateWorkflow(m *workflow.Manager, keyspace string, vtworkers, sourceSha } // initCheckpoint initialize the checkpoint for the horizontal workflow. -func initCheckpoint(keyspace string, vtworkers, sourceShards, destinationShards []string, minHealthyRdonlyTablets, splitCmd, splitDiffDestTabletType string, useConsistentSnapshot string) (*workflowpb.WorkflowCheckpoint, error) { +func initCheckpoint(keyspace string, vtworkers, excludeTables, sourceShards, destinationShards []string, minHealthyRdonlyTablets, splitCmd, splitDiffDestTabletType string, useConsistentSnapshot string) (*workflowpb.WorkflowCheckpoint, error) { tasks := make(map[string]*workflowpb.Task) initTasks(tasks, phaseCopySchema, destinationShards, func(i int, shard string) map[string]string { return map[string]string{ "keyspace": keyspace, "source_shard": sourceShards[0], "destination_shard": shard, + "exclude_tables": strings.Join(excludeTables, ","), } }) initTasks(tasks, phaseClone, sourceShards, func(i int, shard string) map[string]string { @@ -286,6 +289,7 @@ func initCheckpoint(keyspace string, vtworkers, sourceShards, destinationShards "split_cmd": splitCmd, "vtworker": vtworkers[i], "use_consistent_snapshot": useConsistentSnapshot, + "exclude_tables": strings.Join(excludeTables, ","), } }) initTasks(tasks, phaseWaitForFilteredReplication, destinationShards, func(i int, shard string) map[string]string { @@ -301,6 +305,7 @@ func initCheckpoint(keyspace string, vtworkers, sourceShards, destinationShards "dest_tablet_type": splitDiffDestTabletType, "vtworker": vtworkers[i], "use_consistent_snapshot": useConsistentSnapshot, + "exclude_tables": strings.Join(excludeTables, ","), } }) initTasks(tasks, phaseMigrateRdonly, sourceShards, func(i int, shard string) map[string]string { diff --git a/go/vt/workflow/resharding/workflow_test.go b/go/vt/workflow/resharding/workflow_test.go index bc3d7478240..2af2703d50b 100644 --- a/go/vt/workflow/resharding/workflow_test.go +++ b/go/vt/workflow/resharding/workflow_test.go @@ -55,7 +55,7 @@ func TestSourceDestShards(t *testing.T) { defer ctrl.Finish() // Set up the fakeworkerclient. It is used at SplitClone and SplitDiff phase. - fakeVtworkerClient := setupFakeVtworker(testKeyspace, testVtworkers, false) + fakeVtworkerClient := setupFakeVtworker(testKeyspace, testVtworkers, false, "") vtworkerclient.RegisterFactory("fake", fakeVtworkerClient.FakeVtworkerClientFactory) defer vtworkerclient.UnregisterFactoryForTest("fake") @@ -90,15 +90,20 @@ func TestSourceDestShards(t *testing.T) { // TestHorizontalResharding runs the happy path of HorizontalReshardingWorkflow. func TestHorizontalResharding(t *testing.T) { - testHorizontalReshardingWorkflow(t, false) + testHorizontalReshardingWorkflow(t, false, "") } // TestHorizontalReshardingWithConsistentSnapshot runs the happy path of HorizontalReshardingWorkflow with consistent snapshot. func TestHorizontalReshardingWithConsistentSnapshot(t *testing.T) { - testHorizontalReshardingWorkflow(t, true) + testHorizontalReshardingWorkflow(t, true, "") } -func testHorizontalReshardingWorkflow(t *testing.T, useConsistentSnapshot bool) { +// TestHorizontalReshardingWithExcludedTables runs the happy path of HorizontalReshardingWorkflow with excluded tables. +func TestHorizontalReshardingWithExcludedTables(t *testing.T) { + testHorizontalReshardingWorkflow(t, true, "table_a,table_b") +} + +func testHorizontalReshardingWorkflow(t *testing.T, useConsistentSnapshot bool, excludeTables string) { ctx := context.Background() // Set up the mock wrangler. It is used for the CopySchema, // WaitforFilteredReplication and Migrate phase. @@ -106,7 +111,7 @@ func testHorizontalReshardingWorkflow(t *testing.T, useConsistentSnapshot bool) defer ctrl.Finish() mockWranglerInterface := setupMockWrangler(ctrl, testKeyspace) // Set up the fakeworkerclient. It is used at SplitClone and SplitDiff phase. - fakeVtworkerClient := setupFakeVtworker(testKeyspace, testVtworkers, useConsistentSnapshot) + fakeVtworkerClient := setupFakeVtworker(testKeyspace, testVtworkers, useConsistentSnapshot, excludeTables) vtworkerclient.RegisterFactory("fake", fakeVtworkerClient.FakeVtworkerClientFactory) defer vtworkerclient.UnregisterFactoryForTest("fake") // Initialize the topology. @@ -120,6 +125,9 @@ func testHorizontalReshardingWorkflow(t *testing.T, useConsistentSnapshot bool) if useConsistentSnapshot { args = append(args, "-use_consistent_snapshot") } + if excludeTables != "" { + args = append(args, "-exclude_tables="+excludeTables) + } uuid, err := m.Create(ctx, horizontalReshardingFactoryName, args) if err != nil { t.Fatalf("cannot create resharding workflow: %v", err) @@ -148,15 +156,15 @@ func testHorizontalReshardingWorkflow(t *testing.T, useConsistentSnapshot bool) wg.Wait() } -func setupFakeVtworker(keyspace, vtworkers string, useConsistentSnapshot bool) *fakevtworkerclient.FakeVtworkerClient { +func setupFakeVtworker(keyspace, vtworkers string, useConsistentSnapshot bool, excludeTables string) *fakevtworkerclient.FakeVtworkerClient { flag.Set("vtworker_client_protocol", "fake") fakeVtworkerClient := fakevtworkerclient.NewFakeVtworkerClient() fakeVtworkerClient.RegisterResultForAddr(vtworkers, resetCommand(), "", nil) - fakeVtworkerClient.RegisterResultForAddr(vtworkers, splitCloneCommand(keyspace, useConsistentSnapshot), "", nil) + fakeVtworkerClient.RegisterResultForAddr(vtworkers, splitCloneCommand(keyspace, useConsistentSnapshot, excludeTables), "", nil) fakeVtworkerClient.RegisterResultForAddr(vtworkers, resetCommand(), "", nil) - fakeVtworkerClient.RegisterResultForAddr(vtworkers, splitDiffCommand(keyspace, "-80", useConsistentSnapshot), "", nil) + fakeVtworkerClient.RegisterResultForAddr(vtworkers, splitDiffCommand(keyspace, "-80", useConsistentSnapshot, excludeTables), "", nil) fakeVtworkerClient.RegisterResultForAddr(vtworkers, resetCommand(), "", nil) - fakeVtworkerClient.RegisterResultForAddr(vtworkers, splitDiffCommand(keyspace, "80-", useConsistentSnapshot), "", nil) + fakeVtworkerClient.RegisterResultForAddr(vtworkers, splitDiffCommand(keyspace, "80-", useConsistentSnapshot, excludeTables), "", nil) return fakeVtworkerClient } @@ -164,27 +172,33 @@ func resetCommand() []string { return []string{"Reset"} } -func splitCloneCommand(keyspace string, useConsistentSnapshot bool) []string { +func splitCloneCommand(keyspace string, useConsistentSnapshot bool, excludeTables string) []string { args := []string{"SplitClone", "--min_healthy_rdonly_tablets=2", keyspace + "/0"} if useConsistentSnapshot { args = append(args, "--use_consistent_snapshot") } + if excludeTables != "" { + args = append(args, "--exclude_tables="+excludeTables) + } return args } -func splitDiffCommand(keyspace string, shardId string, useConsistentSnapshot bool) []string { +func splitDiffCommand(keyspace string, shardId string, useConsistentSnapshot bool, excludeTables string) []string { args := []string{"SplitDiff", "--min_healthy_rdonly_tablets=1", "--dest_tablet_type=RDONLY", keyspace + "/" + shardId} if useConsistentSnapshot { args = append(args, "--use_consistent_snapshot") } + if excludeTables != "" { + args = append(args, "--exclude_tables="+excludeTables) + } return args } func setupMockWrangler(ctrl *gomock.Controller, keyspace string) *MockReshardingWrangler { mockWranglerInterface := NewMockReshardingWrangler(ctrl) // Set the expected behaviors for mock wrangler. - mockWranglerInterface.EXPECT().CopySchemaShardFromShard(gomock.Any(), nil /* tableArray*/, nil /* excludeTableArray */, true /*includeViews*/, keyspace, "0", keyspace, "-80", wrangler.DefaultWaitSlaveTimeout).Return(nil) - mockWranglerInterface.EXPECT().CopySchemaShardFromShard(gomock.Any(), nil /* tableArray*/, nil /* excludeTableArray */, true /*includeViews*/, keyspace, "0", keyspace, "80-", wrangler.DefaultWaitSlaveTimeout).Return(nil) + mockWranglerInterface.EXPECT().CopySchemaShardFromShard(gomock.Any(), nil /* tableArray*/, gomock.Any() /* excludeTableArray */, true /*includeViews*/, keyspace, "0", keyspace, "-80", wrangler.DefaultWaitSlaveTimeout).Return(nil) + mockWranglerInterface.EXPECT().CopySchemaShardFromShard(gomock.Any(), nil /* tableArray*/, gomock.Any() /* excludeTableArray */, true /*includeViews*/, keyspace, "0", keyspace, "80-", wrangler.DefaultWaitSlaveTimeout).Return(nil) mockWranglerInterface.EXPECT().WaitForFilteredReplication(gomock.Any(), keyspace, "-80", wrangler.DefaultWaitForFilteredReplicationMaxDelay).Return(nil) mockWranglerInterface.EXPECT().WaitForFilteredReplication(gomock.Any(), keyspace, "80-", wrangler.DefaultWaitForFilteredReplicationMaxDelay).Return(nil) diff --git a/go/vt/workflow/reshardingworkflowgen/workflow.go b/go/vt/workflow/reshardingworkflowgen/workflow.go index 43c48cda772..4a6cee33835 100644 --- a/go/vt/workflow/reshardingworkflowgen/workflow.go +++ b/go/vt/workflow/reshardingworkflowgen/workflow.go @@ -58,6 +58,7 @@ func (*Factory) Init(m *workflow.Manager, w *workflowpb.Workflow, args []string) subFlags := flag.NewFlagSet(keyspaceReshardingFactoryName, flag.ContinueOnError) keyspace := subFlags.String("keyspace", "", "Name of keyspace to perform horizontal resharding") vtworkersStr := subFlags.String("vtworkers", "", "A comma-separated list of vtworker addresses") + excludeTablesStr := subFlags.String("exclude_tables", "", "A comma-separated list of tables to exclude") minHealthyRdonlyTablets := subFlags.String("min_healthy_rdonly_tablets", "1", "Minimum number of healthy RDONLY tablets required in source shards") splitCmd := subFlags.String("split_cmd", "SplitClone", "Split command to use to perform horizontal resharding (either SplitClone or LegacySplitClone)") splitDiffDestTabletType := subFlags.String("split_diff_dest_tablet_type", "RDONLY", "Specifies tablet type to use in destination shards while performing SplitDiff operation") @@ -74,6 +75,7 @@ func (*Factory) Init(m *workflow.Manager, w *workflowpb.Workflow, args []string) } vtworkers := strings.Split(*vtworkersStr, ",") + excludeTables := strings.Split(*excludeTablesStr, ",") w.Name = fmt.Sprintf("Keyspace reshard on %s", *keyspace) shardsToSplit, err := findSourceAndDestinationShards(m.TopoServer(), *keyspace) @@ -84,6 +86,7 @@ func (*Factory) Init(m *workflow.Manager, w *workflowpb.Workflow, args []string) checkpoint, err := initCheckpoint( *keyspace, vtworkers, + excludeTables, shardsToSplit, *minHealthyRdonlyTablets, *splitCmd, @@ -191,7 +194,7 @@ func findSourceAndDestinationShards(ts *topo.Server, keyspace string) ([][][]str } // initCheckpoint initialize the checkpoint for keyspace reshard -func initCheckpoint(keyspace string, vtworkers []string, shardsToSplit [][][]string, minHealthyRdonlyTablets, splitCmd, splitDiffDestTabletType, phaseEnableApprovals string, skipStartWorkflows bool, useConsistentSnapshot bool) (*workflowpb.WorkflowCheckpoint, error) { +func initCheckpoint(keyspace string, vtworkers, excludeTables []string, shardsToSplit [][][]string, minHealthyRdonlyTablets, splitCmd, splitDiffDestTabletType, phaseEnableApprovals string, skipStartWorkflows bool, useConsistentSnapshot bool) (*workflowpb.WorkflowCheckpoint, error) { sourceShards := 0 destShards := 0 for _, shardToSplit := range shardsToSplit { @@ -238,6 +241,7 @@ func initCheckpoint(keyspace string, vtworkers []string, shardsToSplit [][][]str "workflows_count": fmt.Sprintf("%v", len(shardsToSplit)), "keyspace": keyspace, "use_consistent_snapshot": fmt.Sprintf("%v", useConsistentSnapshot), + "exclude_tables": fmt.Sprintf("%v", strings.Join(excludeTables, ",")), }, }, nil } @@ -301,6 +305,7 @@ func (hw *reshardingWorkflowGen) workflowCreator(ctx context.Context, task *work horizontalReshardingParams := []string{ "-keyspace=" + hw.keyspaceParam, "-vtworkers=" + task.Attributes["vtworkers"], + "-exclude_tables=" + task.Attributes["exclude_tables"], "-split_cmd=" + hw.splitCmdParam, "-split_diff_dest_tablet_type=" + hw.splitDiffDestTabletTypeParam, "-min_healthy_rdonly_tablets=" + hw.minHealthyRdonlyTabletsParam, diff --git a/go/vt/workflow/reshardingworkflowgen/workflow_flaky_test.go b/go/vt/workflow/reshardingworkflowgen/workflow_flaky_test.go index bdb4ffc5fe8..d929b8daf39 100644 --- a/go/vt/workflow/reshardingworkflowgen/workflow_flaky_test.go +++ b/go/vt/workflow/reshardingworkflowgen/workflow_flaky_test.go @@ -53,7 +53,7 @@ func TestWorfklowGenerator(t *testing.T) { workflow.StartManager(m) // Create the workflow. vtworkersParameter := testVtworkers + "," + testVtworkers - uuid, err := m.Create(ctx, keyspaceReshardingFactoryName, []string{"-keyspace=" + testKeyspace, "-vtworkers=" + vtworkersParameter, "-min_healthy_rdonly_tablets=2", "-use_consistent_snapshot=true"}) + uuid, err := m.Create(ctx, keyspaceReshardingFactoryName, []string{"-keyspace=" + testKeyspace, "-vtworkers=" + vtworkersParameter, "-min_healthy_rdonly_tablets=2", "-use_consistent_snapshot=true", "-exclude_tables=table_a,table_b"}) if err != nil { t.Fatalf("cannot create resharding workflow: %v", err) } diff --git a/web/vtctld2/app/index.html b/web/vtctld2/app/index.html index 70ef85f14ed..0c7fa761667 100644 --- a/web/vtctld2/app/index.html +++ b/web/vtctld2/app/index.html @@ -27,5 +27,5 @@