Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion go/vt/workflow/reshardingworkflowgen/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func (*Factory) Init(m *workflow.Manager, w *workflowpb.Workflow, args []string)
skipStartWorkflows := subFlags.Bool("skip_start_workflows", true, "If true, newly created workflows will have skip_start set")
phaseEnableApprovalsDesc := fmt.Sprintf("Comma separated phases that require explicit approval in the UI to execute. Phase names are: %v", strings.Join(resharding.WorkflowPhases(), ","))
phaseEnableApprovalsStr := subFlags.String("phase_enable_approvals", strings.Join(resharding.WorkflowPhases(), ","), phaseEnableApprovalsDesc)
useConsistentSnapshot := subFlags.Bool("use_consistent_snapshot", false, "Instead of pausing replication on the source, uses transactions with consistent snapshot to have a stable view of the data.")

if err := subFlags.Parse(args); err != nil {
return err
Expand All @@ -89,6 +90,7 @@ func (*Factory) Init(m *workflow.Manager, w *workflowpb.Workflow, args []string)
*splitDiffDestTabletType,
*phaseEnableApprovalsStr,
*skipStartWorkflows,
*useConsistentSnapshot,
)
if err != nil {
return err
Expand Down Expand Up @@ -127,6 +129,7 @@ func (*Factory) Instantiate(m *workflow.Manager, w *workflowpb.Workflow, rootNod
keyspaceParam: checkpoint.Settings["keyspace"],
splitDiffDestTabletTypeParam: checkpoint.Settings["split_diff_dest_tablet_type"],
splitCmdParam: checkpoint.Settings["split_cmd"],
useConsistentSnapshot: checkpoint.Settings["use_consistent_snapshot"],
workflowsCount: workflowsCount,
}
createWorkflowsUINode := &workflow.Node{
Expand Down Expand Up @@ -188,7 +191,7 @@ func findSourceAndDestinationShards(ts *topo.Server, keyspace string) ([][][]str
}

// initCheckpoint initialize the checkpoint for keyspace reshard
func initCheckpoint(keyspace string, vtworkers []string, shardsToSplit [][][]string, minHealthyRdonlyTablets, splitCmd, splitDiffDestTabletType, phaseEnableApprovals string, skipStartWorkflows bool) (*workflowpb.WorkflowCheckpoint, error) {
func initCheckpoint(keyspace string, vtworkers []string, shardsToSplit [][][]string, minHealthyRdonlyTablets, splitCmd, splitDiffDestTabletType, phaseEnableApprovals string, skipStartWorkflows bool, useConsistentSnapshot bool) (*workflowpb.WorkflowCheckpoint, error) {
sourceShards := 0
destShards := 0
for _, shardToSplit := range shardsToSplit {
Expand Down Expand Up @@ -234,6 +237,7 @@ func initCheckpoint(keyspace string, vtworkers []string, shardsToSplit [][][]str
"skip_start_workflows": fmt.Sprintf("%v", skipStartWorkflows),
"workflows_count": fmt.Sprintf("%v", len(shardsToSplit)),
"keyspace": keyspace,
"use_consistent_snapshot": fmt.Sprintf("%v", useConsistentSnapshot),
},
}, nil
}
Expand Down Expand Up @@ -263,6 +267,7 @@ type reshardingWorkflowGen struct {
splitDiffDestTabletTypeParam string
splitCmdParam string
skipStartWorkflowParam string
useConsistentSnapshot string
}

// Run implements workflow.Workflow interface. It creates one horizontal resharding workflow per shard to split
Expand Down Expand Up @@ -302,6 +307,7 @@ func (hw *reshardingWorkflowGen) workflowCreator(ctx context.Context, task *work
"-source_shards=" + task.Attributes["source_shards"],
"-destination_shards=" + task.Attributes["destination_shards"],
"-phase_enable_approvals=" + hw.phaseEnableApprovalsParam,
"-use_consistent_snapshot=" + hw.useConsistentSnapshot,
}

skipStart, err := strconv.ParseBool(hw.skipStartWorkflowParam)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestWorfklowGenerator(t *testing.T) {
workflow.StartManager(m)
// Create the workflow.
vtworkersParameter := testVtworkers + "," + testVtworkers
uuid, err := m.Create(ctx, keyspaceReshardingFactoryName, []string{"-keyspace=" + testKeyspace, "-vtworkers=" + vtworkersParameter, "-min_healthy_rdonly_tablets=2"})
uuid, err := m.Create(ctx, keyspaceReshardingFactoryName, []string{"-keyspace=" + testKeyspace, "-vtworkers=" + vtworkersParameter, "-min_healthy_rdonly_tablets=2", "-use_consistent_snapshot=true"})
if err != nil {
t.Fatalf("cannot create resharding workflow: %v", err)
}
Expand Down
49 changes: 31 additions & 18 deletions web/vtctld2/src/app/shared/flags/workflow.flags.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,65 +45,71 @@ export class NewWorkflowFlags {
this.flags['horizontal_resharding_min_healthy_rdonly_tablets'] = new HorizontalReshardingMinHealthyRdonlyTablets(11, 'horizontal_resharding_min_healthy_rdonly_tablets', 'horizontal_resharding');
this.flags['horizontal_resharding_min_healthy_rdonly_tablets'].positional = true;
this.flags['horizontal_resharding_min_healthy_rdonly_tablets'].namedPositional = 'min_healthy_rdonly_tablets';
this.flags['horizontal_resharding_enable_approvals_copy_schema'] = new HorizontalReshardingEnableApprovalsFlag(12, 'horizontal_resharding_enable_approvals_copy_schema', 'Copy Schema enable approvals', 'horizontal_resharding');
this.flags['horizontal_resharding_use_consistent_snapshot'] = new HorizontalReshardingConsistentSnapshotFlag(12, 'horizontal_resharding_use_consistent_snapshot', 'Use Consistent Snapshot', 'horizontal_resharding');
this.flags['horizontal_resharding_use_consistent_snapshot'].positional = true;
this.flags['horizontal_resharding_use_consistent_snapshot'].namedPositional = 'use_consistent_snapshot';
this.flags['horizontal_resharding_enable_approvals_copy_schema'] = new HorizontalReshardingEnableApprovalsFlag(13, 'horizontal_resharding_enable_approvals_copy_schema', 'Copy Schema enable approvals', 'horizontal_resharding');
this.flags['horizontal_resharding_enable_approvals_copy_schema'].positional = true;
this.flags['horizontal_resharding_enable_approvals_copy_schema'].namedPositional = 'copy_schema';
this.flags['horizontal_resharding_enable_approvals_clone'] = new HorizontalReshardingEnableApprovalsFlag(13, 'horizontal_resharding_enable_approvals_clone', 'Clone enable approvals', 'horizontal_resharding');
this.flags['horizontal_resharding_enable_approvals_clone'] = new HorizontalReshardingEnableApprovalsFlag(14, 'horizontal_resharding_enable_approvals_clone', 'Clone enable approvals', 'horizontal_resharding');
this.flags['horizontal_resharding_enable_approvals_clone'].positional = true;
this.flags['horizontal_resharding_enable_approvals_clone'].namedPositional = 'clone';
this.flags['horizontal_resharding_enable_approvals_wait_filtered_replication'] = new HorizontalReshardingEnableApprovalsFlag(14, 'horizontal_resharding_enable_approvals_wait_filtered_replication', 'Wait filtered replication enable approvals', 'horizontal_resharding');
this.flags['horizontal_resharding_enable_approvals_wait_filtered_replication'] = new HorizontalReshardingEnableApprovalsFlag(15, 'horizontal_resharding_enable_approvals_wait_filtered_replication', 'Wait filtered replication enable approvals', 'horizontal_resharding');
this.flags['horizontal_resharding_enable_approvals_wait_filtered_replication'].positional = true;
this.flags['horizontal_resharding_enable_approvals_wait_filtered_replication'].namedPositional = 'wait_for_filtered_replication';
this.flags['horizontal_resharding_enable_approvals_diff'] = new HorizontalReshardingEnableApprovalsFlag(15, 'horizontal_resharding_enable_approvals_diff', 'Diff enable approvals', 'horizontal_resharding');
this.flags['horizontal_resharding_enable_approvals_diff'] = new HorizontalReshardingEnableApprovalsFlag(16, 'horizontal_resharding_enable_approvals_diff', 'Diff enable approvals', 'horizontal_resharding');
this.flags['horizontal_resharding_enable_approvals_diff'].positional = true;
this.flags['horizontal_resharding_enable_approvals_diff'].namedPositional = 'diff';
this.flags['horizontal_resharding_enable_approvals_migrate_serving_types'] = new HorizontalReshardingEnableApprovalsFlag(16, 'horizontal_resharding_enable_approvals_migrate_serving_types', 'Migrate serving types enable approvals', 'horizontal_resharding');
this.flags['horizontal_resharding_enable_approvals_migrate_serving_types'] = new HorizontalReshardingEnableApprovalsFlag(17, 'horizontal_resharding_enable_approvals_migrate_serving_types', 'Migrate serving types enable approvals', 'horizontal_resharding');
this.flags['horizontal_resharding_enable_approvals_migrate_serving_types'].positional = true;
this.flags['horizontal_resharding_enable_approvals_migrate_serving_types'].namedPositional = 'migrate_rdonly,migrate_replica,migrate_master';


this.flags['horizontal_resharding_phase_enable_approvals'] = new HorizontalReshardingPhaseEnableApprovalFlag(17, 'horizontal_resharding_phase_enable_approvals');
this.flags['horizontal_resharding_phase_enable_approvals'] = new HorizontalReshardingPhaseEnableApprovalFlag(18, 'horizontal_resharding_phase_enable_approvals');
this.flags['horizontal_resharding_phase_enable_approvals'].positional = true;
this.flags['horizontal_resharding_phase_enable_approvals'].namedPositional = 'phase_enable_approvals';


// // Flags for keyspace resharding workflow.
this.flags['hr_workflow_gen_keyspace'] = new HorizontalReshardingKeyspaceFlag(18, 'hr_workflow_gen_keyspace', 'hr_workflow_gen');
this.flags['hr_workflow_gen_keyspace'] = new HorizontalReshardingKeyspaceFlag(19, 'hr_workflow_gen_keyspace', 'hr_workflow_gen');
this.flags['hr_workflow_gen_keyspace'].positional = true;
this.flags['hr_workflow_gen_keyspace'].namedPositional = 'keyspace';
this.flags['hr_workflow_gen_vtworkers'] = new HorizontalReshardingVtworkerFlag(19, 'hr_workflow_gen_vtworkers', 'hr_workflow_gen');
this.flags['hr_workflow_gen_vtworkers'] = new HorizontalReshardingVtworkerFlag(20, 'hr_workflow_gen_vtworkers', 'hr_workflow_gen');
this.flags['hr_workflow_gen_vtworkers'].positional = true;
this.flags['hr_workflow_gen_vtworkers'].namedPositional = 'vtworkers';
this.flags['hr_workflow_gen_split_cmd'] = new SplitCloneCommand(20, 'hr_workflow_gen_split_cmd', 'hr_workflow_gen');
this.flags['hr_workflow_gen_split_cmd'] = new SplitCloneCommand(21, 'hr_workflow_gen_split_cmd', 'hr_workflow_gen');
this.flags['hr_workflow_gen_split_cmd'].positional = true;
this.flags['hr_workflow_gen_split_cmd'].namedPositional = 'split_cmd';
this.flags['hr_workflow_gen_split_diff_dest_tablet_type'] = new SplitDiffTabletType(21, 'hr_workflow_gen_split_diff_dest_tablet_type', 'hr_workflow_gen');
this.flags['hr_workflow_gen_split_diff_dest_tablet_type'] = new SplitDiffTabletType(22, 'hr_workflow_gen_split_diff_dest_tablet_type', 'hr_workflow_gen');
this.flags['hr_workflow_gen_split_diff_dest_tablet_type'].positional = true;
this.flags['hr_workflow_gen_split_diff_dest_tablet_type'].namedPositional = 'split_diff_dest_tablet_type';
this.flags['hr_workflow_gen_min_healthy_rdonly_tablets'] = new HorizontalReshardingMinHealthyRdonlyTablets(22, 'hr_workflow_gen_min_healthy_rdonly_tablets', 'hr_workflow_gen');
this.flags['hr_workflow_gen_min_healthy_rdonly_tablets'] = new HorizontalReshardingMinHealthyRdonlyTablets(23, 'hr_workflow_gen_min_healthy_rdonly_tablets', 'hr_workflow_gen');
this.flags['hr_workflow_gen_min_healthy_rdonly_tablets'].positional = true;
this.flags['hr_workflow_gen_min_healthy_rdonly_tablets'].namedPositional = 'min_healthy_rdonly_tablets';
this.flags['hr_workflow_gen_enable_approvals_copy_schema'] = new HorizontalReshardingEnableApprovalsFlag(23, 'hr_workflow_gen_enable_approvals_copy_schema', 'Copy Schema enable approvals', 'hr_workflow_gen');
this.flags['hr_workflow_gen_use_consistent_snapshot'] = new HorizontalReshardingConsistentSnapshotFlag(24, 'hr_workflow_gen_use_consistent_snapshot', 'Use Consistent Snapshot', 'hr_workflow_gen');
this.flags['hr_workflow_gen_use_consistent_snapshot'].positional = true;
this.flags['hr_workflow_gen_use_consistent_snapshot'].namedPositional = 'use_consistent_snapshot';
this.flags['hr_workflow_gen_enable_approvals_copy_schema'] = new HorizontalReshardingEnableApprovalsFlag(25, 'hr_workflow_gen_enable_approvals_copy_schema', 'Copy Schema enable approvals', 'hr_workflow_gen');
this.flags['hr_workflow_gen_enable_approvals_copy_schema'].positional = true;
this.flags['hr_workflow_gen_enable_approvals_copy_schema'].namedPositional = 'copy_schema';
this.flags['hr_workflow_gen_enable_approvals_clone'] = new HorizontalReshardingEnableApprovalsFlag(24, 'hr_workflow_gen_enable_approvals_clone', 'Clone enable approvals', 'hr_workflow_gen');
this.flags['hr_workflow_gen_enable_approvals_clone'] = new HorizontalReshardingEnableApprovalsFlag(26, 'hr_workflow_gen_enable_approvals_clone', 'Clone enable approvals', 'hr_workflow_gen');
this.flags['hr_workflow_gen_enable_approvals_clone'].positional = true;
this.flags['hr_workflow_gen_enable_approvals_clone'].namedPositional = 'clone';
this.flags['hr_workflow_gen_enable_approvals_wait_filtered_replication'] = new HorizontalReshardingEnableApprovalsFlag(25, 'hr_workflow_gen_enable_approvals_wait_filtered_replication', 'Wait filtered replication enable approvals', 'hr_workflow_gen');
this.flags['hr_workflow_gen_enable_approvals_wait_filtered_replication'] = new HorizontalReshardingEnableApprovalsFlag(27, 'hr_workflow_gen_enable_approvals_wait_filtered_replication', 'Wait filtered replication enable approvals', 'hr_workflow_gen');
this.flags['hr_workflow_gen_enable_approvals_wait_filtered_replication'].positional = true;
this.flags['hr_workflow_gen_enable_approvals_wait_filtered_replication'].namedPositional = 'wait_for_filtered_replication';
this.flags['hr_workflow_gen_enable_approvals_diff'] = new HorizontalReshardingEnableApprovalsFlag(26, 'hr_workflow_gen_enable_approvals_diff', 'Diff enable approvals', 'hr_workflow_gen');
this.flags['hr_workflow_gen_enable_approvals_diff'] = new HorizontalReshardingEnableApprovalsFlag(28, 'hr_workflow_gen_enable_approvals_diff', 'Diff enable approvals', 'hr_workflow_gen');
this.flags['hr_workflow_gen_enable_approvals_diff'].positional = true;
this.flags['hr_workflow_gen_enable_approvals_diff'].namedPositional = 'diff';
this.flags['hr_workflow_gen_enable_approvals_migrate_serving_types'] = new HorizontalReshardingEnableApprovalsFlag(27, 'hr_workflow_gen_enable_approvals_migrate_serving_types', 'Migrate serving types enable approvals', 'hr_workflow_gen');
this.flags['hr_workflow_gen_enable_approvals_migrate_serving_types'] = new HorizontalReshardingEnableApprovalsFlag(29, 'hr_workflow_gen_enable_approvals_migrate_serving_types', 'Migrate serving types enable approvals', 'hr_workflow_gen');
this.flags['hr_workflow_gen_enable_approvals_migrate_serving_types'].positional = true;
this.flags['hr_workflow_gen_enable_approvals_migrate_serving_types'].namedPositional = 'migrate_rdonly,migrate_replica,migrate_master';


this.flags['hr_workflow_gen_phase_enable_approvals'] = new HorizontalReshardingPhaseEnableApprovalFlag(28, 'hr_workflow_gen_phase_enable_approvals');
this.flags['hr_workflow_gen_phase_enable_approvals'] = new HorizontalReshardingPhaseEnableApprovalFlag(30, 'hr_workflow_gen_phase_enable_approvals');
this.flags['hr_workflow_gen_phase_enable_approvals'].positional = true;
this.flags['hr_workflow_gen_phase_enable_approvals'].namedPositional = 'phase_enable_approvals';
this.flags['hr_workflow_gen_skip_start_workflows'] = new ReshardingWorkflowGenSkipStartFlag(30, 'hr_workflow_gen_skip_start_workflows');
this.flags['hr_workflow_gen_skip_start_workflows'] = new ReshardingWorkflowGenSkipStartFlag(31, 'hr_workflow_gen_skip_start_workflows');
this.flags['hr_workflow_gen_skip_start_workflows'].positional = true;
this.flags['hr_workflow_gen_skip_start_workflows'].namedPositional = 'skip_start_workflows';

Expand Down Expand Up @@ -268,6 +274,13 @@ export class HorizontalReshardingEnableApprovalsFlag extends CheckBoxFlag {
}
}

export class HorizontalReshardingConsistentSnapshotFlag extends CheckBoxFlag {
constructor(position: number, id: string, name: string, setDisplayOn: string, value = false) {
super(position, id, name, 'Use consistent snapshot to have a stable view of the data.', value);
this.setDisplayOn('factory_name', setDisplayOn);
}
}

// WorkflowFlags is used by the Start / Stop / Delete dialogs.
export class WorkflowFlags {
flags= {};
Expand Down