Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion go/vt/workflow/resharding/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ func (hw *horizontalReshardingWorkflow) runCopySchema(ctx context.Context, t *wo
keyspace := t.Attributes["keyspace"]
sourceShard := t.Attributes["source_shard"]
destShard := t.Attributes["destination_shard"]
return hw.wr.CopySchemaShardFromShard(ctx, nil /* tableArray*/, nil /* excludeTableArray */, true, /*includeViews*/
excludeTables := strings.Split(t.Attributes["exclude_tables"], ",")
return hw.wr.CopySchemaShardFromShard(ctx, nil /* tableArray*/, excludeTables /* excludeTableArray */, true, /*includeViews*/
keyspace, sourceShard, keyspace, destShard, wrangler.DefaultWaitSlaveTimeout)
}

Expand All @@ -74,6 +75,7 @@ func (hw *horizontalReshardingWorkflow) runSplitClone(ctx context.Context, t *wo
useConsistentSnapshot := t.Attributes["use_consistent_snapshot"]

sourceKeyspaceShard := topoproto.KeyspaceShardString(keyspace, sourceShard)
excludeTables := t.Attributes["exclude_tables"]
// Reset the vtworker to avoid error if vtworker command has been called elsewhere.
// This is because vtworker class doesn't cleanup the environment after execution.
if _, err := automation.ExecuteVtworker(ctx, worker, []string{"Reset"}); err != nil {
Expand All @@ -84,6 +86,11 @@ func (hw *horizontalReshardingWorkflow) runSplitClone(ctx context.Context, t *wo
if useConsistentSnapshot != "" {
args = append(args, "--use_consistent_snapshot")
}

if excludeTables != "" {
args = append(args, "--exclude_tables="+excludeTables)
}

_, err := automation.ExecuteVtworker(hw.ctx, worker, args)
return err
}
Expand All @@ -100,6 +107,7 @@ func (hw *horizontalReshardingWorkflow) runSplitDiff(ctx context.Context, t *wor
destinationTabletType := t.Attributes["dest_tablet_type"]
worker := t.Attributes["vtworker"]
useConsistentSnapshot := t.Attributes["use_consistent_snapshot"]
excludeTables := t.Attributes["exclude_tables"]

if _, err := automation.ExecuteVtworker(hw.ctx, worker, []string{"Reset"}); err != nil {
return err
Expand All @@ -108,6 +116,11 @@ func (hw *horizontalReshardingWorkflow) runSplitDiff(ctx context.Context, t *wor
if useConsistentSnapshot != "" {
args = append(args, "--use_consistent_snapshot")
}

if excludeTables != "" {
args = append(args, "--exclude_tables="+excludeTables)
}

_, err := automation.ExecuteVtworker(ctx, worker, args)
return err
}
Expand Down
9 changes: 7 additions & 2 deletions go/vt/workflow/resharding/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func (*Factory) Init(m *workflow.Manager, w *workflowpb.Workflow, args []string)
subFlags := flag.NewFlagSet(horizontalReshardingFactoryName, flag.ContinueOnError)
keyspace := subFlags.String("keyspace", "", "Name of keyspace to perform horizontal resharding")
vtworkersStr := subFlags.String("vtworkers", "", "A comma-separated list of vtworker addresses")
excludeTablesStr := subFlags.String("exclude_tables", "", "A comma-separated list of tables to exclude")
sourceShardsStr := subFlags.String("source_shards", "", "A comma-separated list of source shards")
destinationShardsStr := subFlags.String("destination_shards", "", "A comma-separated list of destination shards")
minHealthyRdonlyTablets := subFlags.String("min_healthy_rdonly_tablets", "1", "Minimum number of healthy RDONLY tablets required in source shards")
Expand All @@ -85,6 +86,7 @@ func (*Factory) Init(m *workflow.Manager, w *workflowpb.Workflow, args []string)
}

vtworkers := strings.Split(*vtworkersStr, ",")
excludeTables := strings.Split(*excludeTablesStr, ",")
sourceShards := strings.Split(*sourceShardsStr, ",")
destinationShards := strings.Split(*destinationShardsStr, ",")
phaseEnableApprovals := parsePhaseEnableApprovals(*phaseEnableApprovalsStr)
Expand All @@ -110,7 +112,7 @@ func (*Factory) Init(m *workflow.Manager, w *workflowpb.Workflow, args []string)
}

w.Name = fmt.Sprintf("Reshard shards %v into shards %v of keyspace %v.", *keyspace, *sourceShardsStr, *destinationShardsStr)
checkpoint, err := initCheckpoint(*keyspace, vtworkers, sourceShards, destinationShards, *minHealthyRdonlyTablets, *splitCmd, *splitDiffDestTabletType, useConsistentSnapshotArg)
checkpoint, err := initCheckpoint(*keyspace, vtworkers, excludeTables, sourceShards, destinationShards, *minHealthyRdonlyTablets, *splitCmd, *splitDiffDestTabletType, useConsistentSnapshotArg)
if err != nil {
return err
}
Expand Down Expand Up @@ -269,13 +271,14 @@ func validateWorkflow(m *workflow.Manager, keyspace string, vtworkers, sourceSha
}

// initCheckpoint initialize the checkpoint for the horizontal workflow.
func initCheckpoint(keyspace string, vtworkers, sourceShards, destinationShards []string, minHealthyRdonlyTablets, splitCmd, splitDiffDestTabletType string, useConsistentSnapshot string) (*workflowpb.WorkflowCheckpoint, error) {
func initCheckpoint(keyspace string, vtworkers, excludeTables, sourceShards, destinationShards []string, minHealthyRdonlyTablets, splitCmd, splitDiffDestTabletType string, useConsistentSnapshot string) (*workflowpb.WorkflowCheckpoint, error) {
tasks := make(map[string]*workflowpb.Task)
initTasks(tasks, phaseCopySchema, destinationShards, func(i int, shard string) map[string]string {
return map[string]string{
"keyspace": keyspace,
"source_shard": sourceShards[0],
"destination_shard": shard,
"exclude_tables": strings.Join(excludeTables, ","),
}
})
initTasks(tasks, phaseClone, sourceShards, func(i int, shard string) map[string]string {
Expand All @@ -286,6 +289,7 @@ func initCheckpoint(keyspace string, vtworkers, sourceShards, destinationShards
"split_cmd": splitCmd,
"vtworker": vtworkers[i],
"use_consistent_snapshot": useConsistentSnapshot,
"exclude_tables": strings.Join(excludeTables, ","),
}
})
initTasks(tasks, phaseWaitForFilteredReplication, destinationShards, func(i int, shard string) map[string]string {
Expand All @@ -301,6 +305,7 @@ func initCheckpoint(keyspace string, vtworkers, sourceShards, destinationShards
"dest_tablet_type": splitDiffDestTabletType,
"vtworker": vtworkers[i],
"use_consistent_snapshot": useConsistentSnapshot,
"exclude_tables": strings.Join(excludeTables, ","),
}
})
initTasks(tasks, phaseMigrateRdonly, sourceShards, func(i int, shard string) map[string]string {
Expand Down
40 changes: 27 additions & 13 deletions go/vt/workflow/resharding/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestSourceDestShards(t *testing.T) {
defer ctrl.Finish()

// Set up the fakeworkerclient. It is used at SplitClone and SplitDiff phase.
fakeVtworkerClient := setupFakeVtworker(testKeyspace, testVtworkers, false)
fakeVtworkerClient := setupFakeVtworker(testKeyspace, testVtworkers, false, "")
vtworkerclient.RegisterFactory("fake", fakeVtworkerClient.FakeVtworkerClientFactory)
defer vtworkerclient.UnregisterFactoryForTest("fake")

Expand Down Expand Up @@ -90,23 +90,28 @@ func TestSourceDestShards(t *testing.T) {

// TestHorizontalResharding runs the happy path of HorizontalReshardingWorkflow.
func TestHorizontalResharding(t *testing.T) {
testHorizontalReshardingWorkflow(t, false)
testHorizontalReshardingWorkflow(t, false, "")
}

// TestHorizontalReshardingWithConsistentSnapshot runs the happy path of HorizontalReshardingWorkflow with consistent snapshot.
func TestHorizontalReshardingWithConsistentSnapshot(t *testing.T) {
testHorizontalReshardingWorkflow(t, true)
testHorizontalReshardingWorkflow(t, true, "")
}

func testHorizontalReshardingWorkflow(t *testing.T, useConsistentSnapshot bool) {
// TestHorizontalReshardingWithExcludedTables runs the happy path of HorizontalReshardingWorkflow with excluded tables.
func TestHorizontalReshardingWithExcludedTables(t *testing.T) {
testHorizontalReshardingWorkflow(t, true, "table_a,table_b")
}

func testHorizontalReshardingWorkflow(t *testing.T, useConsistentSnapshot bool, excludeTables string) {
ctx := context.Background()
// Set up the mock wrangler. It is used for the CopySchema,
// WaitforFilteredReplication and Migrate phase.
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockWranglerInterface := setupMockWrangler(ctrl, testKeyspace)
// Set up the fakeworkerclient. It is used at SplitClone and SplitDiff phase.
fakeVtworkerClient := setupFakeVtworker(testKeyspace, testVtworkers, useConsistentSnapshot)
fakeVtworkerClient := setupFakeVtworker(testKeyspace, testVtworkers, useConsistentSnapshot, excludeTables)
vtworkerclient.RegisterFactory("fake", fakeVtworkerClient.FakeVtworkerClientFactory)
defer vtworkerclient.UnregisterFactoryForTest("fake")
// Initialize the topology.
Expand All @@ -120,6 +125,9 @@ func testHorizontalReshardingWorkflow(t *testing.T, useConsistentSnapshot bool)
if useConsistentSnapshot {
args = append(args, "-use_consistent_snapshot")
}
if excludeTables != "" {
args = append(args, "-exclude_tables="+excludeTables)
}
uuid, err := m.Create(ctx, horizontalReshardingFactoryName, args)
if err != nil {
t.Fatalf("cannot create resharding workflow: %v", err)
Expand Down Expand Up @@ -148,43 +156,49 @@ func testHorizontalReshardingWorkflow(t *testing.T, useConsistentSnapshot bool)
wg.Wait()
}

func setupFakeVtworker(keyspace, vtworkers string, useConsistentSnapshot bool) *fakevtworkerclient.FakeVtworkerClient {
func setupFakeVtworker(keyspace, vtworkers string, useConsistentSnapshot bool, excludeTables string) *fakevtworkerclient.FakeVtworkerClient {
flag.Set("vtworker_client_protocol", "fake")
fakeVtworkerClient := fakevtworkerclient.NewFakeVtworkerClient()
fakeVtworkerClient.RegisterResultForAddr(vtworkers, resetCommand(), "", nil)
fakeVtworkerClient.RegisterResultForAddr(vtworkers, splitCloneCommand(keyspace, useConsistentSnapshot), "", nil)
fakeVtworkerClient.RegisterResultForAddr(vtworkers, splitCloneCommand(keyspace, useConsistentSnapshot, excludeTables), "", nil)
fakeVtworkerClient.RegisterResultForAddr(vtworkers, resetCommand(), "", nil)
fakeVtworkerClient.RegisterResultForAddr(vtworkers, splitDiffCommand(keyspace, "-80", useConsistentSnapshot), "", nil)
fakeVtworkerClient.RegisterResultForAddr(vtworkers, splitDiffCommand(keyspace, "-80", useConsistentSnapshot, excludeTables), "", nil)
fakeVtworkerClient.RegisterResultForAddr(vtworkers, resetCommand(), "", nil)
fakeVtworkerClient.RegisterResultForAddr(vtworkers, splitDiffCommand(keyspace, "80-", useConsistentSnapshot), "", nil)
fakeVtworkerClient.RegisterResultForAddr(vtworkers, splitDiffCommand(keyspace, "80-", useConsistentSnapshot, excludeTables), "", nil)
return fakeVtworkerClient
}

func resetCommand() []string {
return []string{"Reset"}
}

func splitCloneCommand(keyspace string, useConsistentSnapshot bool) []string {
func splitCloneCommand(keyspace string, useConsistentSnapshot bool, excludeTables string) []string {
args := []string{"SplitClone", "--min_healthy_rdonly_tablets=2", keyspace + "/0"}
if useConsistentSnapshot {
args = append(args, "--use_consistent_snapshot")
}
if excludeTables != "" {
args = append(args, "--exclude_tables="+excludeTables)
}
return args
}

func splitDiffCommand(keyspace string, shardId string, useConsistentSnapshot bool) []string {
func splitDiffCommand(keyspace string, shardId string, useConsistentSnapshot bool, excludeTables string) []string {
args := []string{"SplitDiff", "--min_healthy_rdonly_tablets=1", "--dest_tablet_type=RDONLY", keyspace + "/" + shardId}
if useConsistentSnapshot {
args = append(args, "--use_consistent_snapshot")
}
if excludeTables != "" {
args = append(args, "--exclude_tables="+excludeTables)
}
return args
}

func setupMockWrangler(ctrl *gomock.Controller, keyspace string) *MockReshardingWrangler {
mockWranglerInterface := NewMockReshardingWrangler(ctrl)
// Set the expected behaviors for mock wrangler.
mockWranglerInterface.EXPECT().CopySchemaShardFromShard(gomock.Any(), nil /* tableArray*/, nil /* excludeTableArray */, true /*includeViews*/, keyspace, "0", keyspace, "-80", wrangler.DefaultWaitSlaveTimeout).Return(nil)
mockWranglerInterface.EXPECT().CopySchemaShardFromShard(gomock.Any(), nil /* tableArray*/, nil /* excludeTableArray */, true /*includeViews*/, keyspace, "0", keyspace, "80-", wrangler.DefaultWaitSlaveTimeout).Return(nil)
mockWranglerInterface.EXPECT().CopySchemaShardFromShard(gomock.Any(), nil /* tableArray*/, gomock.Any() /* excludeTableArray */, true /*includeViews*/, keyspace, "0", keyspace, "-80", wrangler.DefaultWaitSlaveTimeout).Return(nil)
mockWranglerInterface.EXPECT().CopySchemaShardFromShard(gomock.Any(), nil /* tableArray*/, gomock.Any() /* excludeTableArray */, true /*includeViews*/, keyspace, "0", keyspace, "80-", wrangler.DefaultWaitSlaveTimeout).Return(nil)

mockWranglerInterface.EXPECT().WaitForFilteredReplication(gomock.Any(), keyspace, "-80", wrangler.DefaultWaitForFilteredReplicationMaxDelay).Return(nil)
mockWranglerInterface.EXPECT().WaitForFilteredReplication(gomock.Any(), keyspace, "80-", wrangler.DefaultWaitForFilteredReplicationMaxDelay).Return(nil)
Expand Down
7 changes: 6 additions & 1 deletion go/vt/workflow/reshardingworkflowgen/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func (*Factory) Init(m *workflow.Manager, w *workflowpb.Workflow, args []string)
subFlags := flag.NewFlagSet(keyspaceReshardingFactoryName, flag.ContinueOnError)
keyspace := subFlags.String("keyspace", "", "Name of keyspace to perform horizontal resharding")
vtworkersStr := subFlags.String("vtworkers", "", "A comma-separated list of vtworker addresses")
excludeTablesStr := subFlags.String("exclude_tables", "", "A comma-separated list of tables to exclude")
minHealthyRdonlyTablets := subFlags.String("min_healthy_rdonly_tablets", "1", "Minimum number of healthy RDONLY tablets required in source shards")
splitCmd := subFlags.String("split_cmd", "SplitClone", "Split command to use to perform horizontal resharding (either SplitClone or LegacySplitClone)")
splitDiffDestTabletType := subFlags.String("split_diff_dest_tablet_type", "RDONLY", "Specifies tablet type to use in destination shards while performing SplitDiff operation")
Expand All @@ -74,6 +75,7 @@ func (*Factory) Init(m *workflow.Manager, w *workflowpb.Workflow, args []string)
}

vtworkers := strings.Split(*vtworkersStr, ",")
excludeTables := strings.Split(*excludeTablesStr, ",")

w.Name = fmt.Sprintf("Keyspace reshard on %s", *keyspace)
shardsToSplit, err := findSourceAndDestinationShards(m.TopoServer(), *keyspace)
Expand All @@ -84,6 +86,7 @@ func (*Factory) Init(m *workflow.Manager, w *workflowpb.Workflow, args []string)
checkpoint, err := initCheckpoint(
*keyspace,
vtworkers,
excludeTables,
shardsToSplit,
*minHealthyRdonlyTablets,
*splitCmd,
Expand Down Expand Up @@ -191,7 +194,7 @@ func findSourceAndDestinationShards(ts *topo.Server, keyspace string) ([][][]str
}

// initCheckpoint initialize the checkpoint for keyspace reshard
func initCheckpoint(keyspace string, vtworkers []string, shardsToSplit [][][]string, minHealthyRdonlyTablets, splitCmd, splitDiffDestTabletType, phaseEnableApprovals string, skipStartWorkflows bool, useConsistentSnapshot bool) (*workflowpb.WorkflowCheckpoint, error) {
func initCheckpoint(keyspace string, vtworkers, excludeTables []string, shardsToSplit [][][]string, minHealthyRdonlyTablets, splitCmd, splitDiffDestTabletType, phaseEnableApprovals string, skipStartWorkflows bool, useConsistentSnapshot bool) (*workflowpb.WorkflowCheckpoint, error) {
sourceShards := 0
destShards := 0
for _, shardToSplit := range shardsToSplit {
Expand Down Expand Up @@ -238,6 +241,7 @@ func initCheckpoint(keyspace string, vtworkers []string, shardsToSplit [][][]str
"workflows_count": fmt.Sprintf("%v", len(shardsToSplit)),
"keyspace": keyspace,
"use_consistent_snapshot": fmt.Sprintf("%v", useConsistentSnapshot),
"exclude_tables": fmt.Sprintf("%v", strings.Join(excludeTables, ",")),
},
}, nil
}
Expand Down Expand Up @@ -301,6 +305,7 @@ func (hw *reshardingWorkflowGen) workflowCreator(ctx context.Context, task *work
horizontalReshardingParams := []string{
"-keyspace=" + hw.keyspaceParam,
"-vtworkers=" + task.Attributes["vtworkers"],
"-exclude_tables=" + task.Attributes["exclude_tables"],
"-split_cmd=" + hw.splitCmdParam,
"-split_diff_dest_tablet_type=" + hw.splitDiffDestTabletTypeParam,
"-min_healthy_rdonly_tablets=" + hw.minHealthyRdonlyTabletsParam,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestWorfklowGenerator(t *testing.T) {
workflow.StartManager(m)
// Create the workflow.
vtworkersParameter := testVtworkers + "," + testVtworkers
uuid, err := m.Create(ctx, keyspaceReshardingFactoryName, []string{"-keyspace=" + testKeyspace, "-vtworkers=" + vtworkersParameter, "-min_healthy_rdonly_tablets=2", "-use_consistent_snapshot=true"})
uuid, err := m.Create(ctx, keyspaceReshardingFactoryName, []string{"-keyspace=" + testKeyspace, "-vtworkers=" + vtworkersParameter, "-min_healthy_rdonly_tablets=2", "-use_consistent_snapshot=true", "-exclude_tables=table_a,table_b"})
if err != nil {
t.Fatalf("cannot create resharding workflow: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion web/vtctld2/app/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,5 @@
</head>
<body class="flex-column">
<vt-app-root class="flex-column flex-grow">Loading...</vt-app-root>
<script type="text/javascript" src="inline.js"></script><script type="text/javascript" src="styles.38b88af69dfd283498eb.bundle.js"></script><script type="text/javascript" src="main.f08922949ce1705e18fe.bundle.js"></script></body>
<script type="text/javascript" src="inline.js"></script><script type="text/javascript" src="styles.38b88af69dfd283498eb.bundle.js"></script><script type="text/javascript" src="main.38a1a560f8f628e31552.bundle.js"></script></body>
</html>
Loading