Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 38 additions & 23 deletions go/vt/vtctl/vtctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ var commands = []commandGroup{
"<from_keyspace> <to_keyspace> <tables>",
"Start the VerticalSplitClone process to perform vertical resharding. Example: SplitClone from_ks to_ks 'a,/b.*/'"},
{"VDiff", commandVDiff,
"-workflow=<workflow> <target keyspace> [-source_cell=<cell>] [-target_cell=<cell>] [-tablet_types=REPLICA] [-filtered_replication_wait_time=30s]",
"[-source_cell=<cell>] [-target_cell=<cell>] [-tablet_types=replica] [-filtered_replication_wait_time=30s] <keyspace.workflow>",
"Perform a diff of all tables in the workflow"},
{"MigrateServedTypes", commandMigrateServedTypes,
"[-cells=c1,c2,...] [-reverse] [-skip-refresh-state] <keyspace/shard> <served tablet type>",
Expand All @@ -323,10 +323,10 @@ var commands = []commandGroup{
"[-cells=c1,c2,...] [-reverse] <destination keyspace/shard> <served tablet type>",
"Makes the <destination keyspace/shard> serve the given type. This command also rebuilds the serving graph."},
{"MigrateReads", commandMigrateReads,
"[-cells=c1,c2,...] [-reverse] -workflow=workflow <target keyspace> <tablet type>",
"[-cells=c1,c2,...] [-reverse] -tablet_type={replica|rdonly} <keyspace.workflow>",
"Migrate read traffic for the specified workflow."},
{"MigrateWrites", commandMigrateWrites,
"[-filtered_replication_wait_time=30s] [-cancel] [-reverse_replication=false] -workflow=workflow <target keyspace>",
"[-filtered_replication_wait_time=30s] [-cancel] [-reverse_replication=false] <keyspace.workflow>",
"Migrate write traffic for the specified workflow."},
{"CancelResharding", commandCancelResharding,
"<keyspace/shard>",
Expand Down Expand Up @@ -1811,24 +1811,35 @@ func commandVerticalSplitClone(ctx context.Context, wr *wrangler.Wrangler, subFl
}

func commandVDiff(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
workflow := subFlags.String("workflow", "", "Specifies the workflow name")
sourceCell := subFlags.String("source_cell", "", "The source cell to compare from")
targetCell := subFlags.String("target_cell", "", "The target cell to compare with")
tabletTypes := subFlags.String("tablet_types", "", "Tablet types for source and target")
filteredReplicationWaitTime := subFlags.Duration("filtered_replication_wait_time", 30*time.Second, "Specifies the maximum time to wait, in seconds, for filtered replication to catch up on master migrations. The migration will be aborted on timeout.")
if err := subFlags.Parse(args); err != nil {
return err
}

if subFlags.NArg() != 1 {
return fmt.Errorf("the <target keyspace> is required")
return fmt.Errorf("<keyspace.workflow> is required")
}
keyspace, workflow, err := splitKeyspaceWorkflow(subFlags.Arg(0))
if err != nil {
return err
}

targetKeyspace := subFlags.Arg(0)
_, err := wr.VDiff(ctx, targetKeyspace, *workflow, *sourceCell, *targetCell, *tabletTypes, *filteredReplicationWaitTime,
_, err = wr.VDiff(ctx, keyspace, workflow, *sourceCell, *targetCell, *tabletTypes, *filteredReplicationWaitTime,
*HealthCheckTopologyRefresh, *HealthcheckRetryDelay, *HealthCheckTimeout)
return err
}

func splitKeyspaceWorkflow(in string) (keyspace, workflow string, err error) {
splits := strings.Split(in, ".")
if len(splits) != 2 {
return "", "", fmt.Errorf("invalid format for <keyspace.workflow>: %s", in)
}
return splits[0], splits[1], nil
}

func commandMigrateServedTypes(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
cellsStr := subFlags.String("cells", "", "Specifies a comma-separated list of cells to update")
reverse := subFlags.Bool("reverse", false, "Moves the served tablet type backward instead of forward.")
Expand Down Expand Up @@ -1889,16 +1900,15 @@ func commandMigrateServedFrom(ctx context.Context, wr *wrangler.Wrangler, subFla
func commandMigrateReads(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
reverse := subFlags.Bool("reverse", false, "Moves the served tablet type backward instead of forward.")
cellsStr := subFlags.String("cells", "", "Specifies a comma-separated list of cells to update")
workflow := subFlags.String("workflow", "", "Specifies the workflow name")
tabletType := subFlags.String("tablet_type", "", "Tablet type (replica or rdonly)")
if err := subFlags.Parse(args); err != nil {
return err
}
if subFlags.NArg() != 2 {
return fmt.Errorf("the <target keyspace> and <tablet type> arguments are required for the MigrateReads command")
}

keyspace := subFlags.Arg(0)
servedType, err := parseTabletType(subFlags.Arg(2), []topodatapb.TabletType{topodatapb.TabletType_REPLICA, topodatapb.TabletType_RDONLY})
if *tabletType == "" {
return fmt.Errorf("-tablet_type must be specified")
}
servedType, err := parseTabletType(*tabletType, []topodatapb.TabletType{topodatapb.TabletType_REPLICA, topodatapb.TabletType_RDONLY})
if err != nil {
return err
}
Expand All @@ -1910,29 +1920,34 @@ func commandMigrateReads(ctx context.Context, wr *wrangler.Wrangler, subFlags *f
if *reverse {
direction = wrangler.DirectionBackward
}
if *workflow == "" {
return fmt.Errorf("a -workflow=workflow argument is required")
if subFlags.NArg() != 1 {
return fmt.Errorf("<keyspace.workflow> is required")
}
keyspace, workflow, err := splitKeyspaceWorkflow(subFlags.Arg(0))
if err != nil {
return err
}
return wr.MigrateReads(ctx, keyspace, *workflow, servedType, cells, direction)

return wr.MigrateReads(ctx, keyspace, workflow, servedType, cells, direction)
}

func commandMigrateWrites(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
filteredReplicationWaitTime := subFlags.Duration("filtered_replication_wait_time", 30*time.Second, "Specifies the maximum time to wait, in seconds, for filtered replication to catch up on master migrations. The migration will be aborted on timeout.")
reverseReplication := subFlags.Bool("reverse_replication", true, "Also reverse the replication")
cancelMigrate := subFlags.Bool("cancel", false, "Cancel the failed migration and serve from source")
workflow := subFlags.String("workflow", "", "Specifies the workflow name")
if err := subFlags.Parse(args); err != nil {
return err
}

if subFlags.NArg() != 1 {
return fmt.Errorf("the <target keyspace> argument is required for the MigrateWrites command")
return fmt.Errorf("<keyspace.workflow> is required")
}

keyspace := subFlags.Arg(0)
if *workflow == "" {
return fmt.Errorf("a -workflow=workflow argument is required")
keyspace, workflow, err := splitKeyspaceWorkflow(subFlags.Arg(0))
if err != nil {
return err
}
journalID, err := wr.MigrateWrites(ctx, keyspace, *workflow, *filteredReplicationWaitTime, *cancelMigrate, *reverseReplication)

journalID, err := wr.MigrateWrites(ctx, keyspace, workflow, *filteredReplicationWaitTime, *cancelMigrate, *reverseReplication)
if err != nil {
return err
}
Expand Down