Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions go/test/endtoend/vreplication/resharding_workflows_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ const (
workflowActionSwitchTraffic = "SwitchTraffic"
workflowActionReverseTraffic = "ReverseTraffic"
workflowActionComplete = "Complete"
workflowActionAbort = "Abort"
workflowActionCancel = "Cancel"
)

var (
Expand Down Expand Up @@ -154,8 +154,8 @@ func tstWorkflowComplete(t *testing.T) error {
return tstWorkflowAction(t, workflowActionComplete, "", "")
}

func tstWorkflowAbort(t *testing.T) error {
return tstWorkflowAction(t, workflowActionAbort, "", "")
func tstWorkflowCancel(t *testing.T) error {
return tstWorkflowAction(t, workflowActionCancel, "", "")
}

func validateReadsRoute(t *testing.T, tabletTypes string, tablet *cluster.VttabletProcess) {
Expand Down Expand Up @@ -276,7 +276,7 @@ func testMoveTablesV2Workflow(t *testing.T) {
output, _ = vc.VtctlClient.ExecuteCommandWithOutput(listAllArgs...)
require.Contains(t, output, "Following workflow(s) found in keyspace customer: wf1")

err := tstWorkflowAbort(t)
err := tstWorkflowCancel(t)
require.NoError(t, err)

output, _ = vc.VtctlClient.ExecuteCommandWithOutput(listAllArgs...)
Expand Down
20 changes: 10 additions & 10 deletions go/vt/vtctl/vtctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1958,7 +1958,7 @@ const (
vReplicationWorkflowActionSwitchTraffic = "switchtraffic"
vReplicationWorkflowActionReverseTraffic = "reversetraffic"
vReplicationWorkflowActionComplete = "complete"
vReplicationWorkflowActionAbort = "abort"
vReplicationWorkflowActionCancel = "cancel"
vReplicationWorkflowActionShow = "show"
vReplicationWorkflowActionProgress = "progress"
vReplicationWorkflowActionGetState = "getstate"
Expand All @@ -1970,7 +1970,7 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
cells := subFlags.String("cells", "", "Cell(s) or CellAlias(es) (comma-separated) to replicate from.")
tabletTypes := subFlags.String("tablet_types", "", "Source tablet types to replicate from (e.g. master, replica, rdonly). Defaults to -vreplication_tablet_type parameter value for the tablet, which has the default value of replica.")
dryRun := subFlags.Bool("dry_run", false, "Does a dry run of SwitchReads and only reports the actions to be taken")
timeout := subFlags.Duration("timeout", 30*time.Second, "Specifies the maximum time to wait, in seconds, for vreplication to catch up on master migrations. The migration will be aborted on timeout.")
timeout := subFlags.Duration("timeout", 30*time.Second, "Specifies the maximum time to wait, in seconds, for vreplication to catch up on master migrations. The migration will be cancelled on a timeout.")
reverseReplication := subFlags.Bool("reverse_replication", true, "Also reverse the replication")
keepData := subFlags.Bool("keep_data", false, "Do not drop tables or shards (if true, only vreplication artifacts are cleaned up)")

Expand Down Expand Up @@ -2088,7 +2088,7 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
vrwp.TabletTypes = *tabletTypes
vrwp.Timeout = *timeout
vrwp.EnableReverseReplication = *reverseReplication
case vReplicationWorkflowActionAbort:
case vReplicationWorkflowActionCancel:
vrwp.KeepData = *keepData
case vReplicationWorkflowActionComplete:
switch workflowType {
Expand Down Expand Up @@ -2212,8 +2212,8 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
err = wf.ReverseTraffic()
case vReplicationWorkflowActionComplete:
err = wf.Complete()
case vReplicationWorkflowActionAbort:
err = wf.Abort()
case vReplicationWorkflowActionCancel:
err = wf.Cancel()
case vReplicationWorkflowActionGetState:
wr.Logger().Printf(wf.CachedState() + "\n")
return nil
Expand Down Expand Up @@ -2300,7 +2300,7 @@ func commandVDiff(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.Fla
sourceCell := subFlags.String("source_cell", "", "The source cell to compare from")
targetCell := subFlags.String("target_cell", "", "The target cell to compare with")
tabletTypes := subFlags.String("tablet_types", "master,replica,rdonly", "Tablet types for source and target")
filteredReplicationWaitTime := subFlags.Duration("filtered_replication_wait_time", 30*time.Second, "Specifies the maximum time to wait, in seconds, for filtered replication to catch up on master migrations. The migration will be aborted on timeout.")
filteredReplicationWaitTime := subFlags.Duration("filtered_replication_wait_time", 30*time.Second, "Specifies the maximum time to wait, in seconds, for filtered replication to catch up on master migrations. The migration will be cancelled on a timeout.")
maxRows := subFlags.Int64("limit", math.MaxInt64, "Max rows to stop comparing after")
format := subFlags.String("format", "", "Format of report") //"json" or ""
tables := subFlags.String("tables", "", "Only run vdiff for these tables in the workflow")
Expand Down Expand Up @@ -2341,7 +2341,7 @@ func commandMigrateServedTypes(ctx context.Context, wr *wrangler.Wrangler, subFl
cellsStr := subFlags.String("cells", "", "Specifies a comma-separated list of cells to update")
reverse := subFlags.Bool("reverse", false, "Moves the served tablet type backward instead of forward.")
skipReFreshState := subFlags.Bool("skip-refresh-state", false, "Skips refreshing the state of the source tablets after the migration, meaning that the refresh will need to be done manually, replica and rdonly only)")
filteredReplicationWaitTime := subFlags.Duration("filtered_replication_wait_time", 30*time.Second, "Specifies the maximum time to wait, in seconds, for filtered replication to catch up on master migrations. The migration will be aborted on timeout.")
filteredReplicationWaitTime := subFlags.Duration("filtered_replication_wait_time", 30*time.Second, "Specifies the maximum time to wait, in seconds, for filtered replication to catch up on master migrations. The migration will be cancelled on a timeout.")
reverseReplication := subFlags.Bool("reverse_replication", false, "For master migration, enabling this flag reverses replication which allows you to rollback")
if err := subFlags.Parse(args); err != nil {
return err
Expand Down Expand Up @@ -2371,7 +2371,7 @@ func commandMigrateServedTypes(ctx context.Context, wr *wrangler.Wrangler, subFl
func commandMigrateServedFrom(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
reverse := subFlags.Bool("reverse", false, "Moves the served tablet type backward instead of forward.")
cellsStr := subFlags.String("cells", "", "Specifies a comma-separated list of cells to update")
filteredReplicationWaitTime := subFlags.Duration("filtered_replication_wait_time", 30*time.Second, "Specifies the maximum time to wait, in seconds, for filtered replication to catch up on master migrations. The migration will be aborted on timeout.")
filteredReplicationWaitTime := subFlags.Duration("filtered_replication_wait_time", 30*time.Second, "Specifies the maximum time to wait, in seconds, for filtered replication to catch up on master migrations. The migration will be cancelled on a timeout.")
if err := subFlags.Parse(args); err != nil {
return err
}
Expand Down Expand Up @@ -2480,8 +2480,8 @@ func commandSwitchReads(ctx context.Context, wr *wrangler.Wrangler, subFlags *fl
}

func commandSwitchWrites(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
timeout := subFlags.Duration("timeout", 30*time.Second, "Specifies the maximum time to wait, in seconds, for vreplication to catch up on master migrations. The migration will be aborted on timeout.")
filteredReplicationWaitTime := subFlags.Duration("filtered_replication_wait_time", 30*time.Second, "DEPRECATED Specifies the maximum time to wait, in seconds, for vreplication to catch up on master migrations. The migration will be aborted on timeout.")
timeout := subFlags.Duration("timeout", 30*time.Second, "Specifies the maximum time to wait, in seconds, for vreplication to catch up on master migrations. The migration will be cancelled on a timeout.")
filteredReplicationWaitTime := subFlags.Duration("filtered_replication_wait_time", 30*time.Second, "DEPRECATED Specifies the maximum time to wait, in seconds, for vreplication to catch up on master migrations. The migration will be cancelled on a timeout.")
reverseReplication := subFlags.Bool("reverse_replication", true, "Also reverse the replication")
cancel := subFlags.Bool("cancel", false, "Cancel the failed migration and serve from source")
reverse := subFlags.Bool("reverse", false, "Reverse a previous SwitchWrites serve from source")
Expand Down
2 changes: 1 addition & 1 deletion go/vt/workflow/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func (m *Manager) Run(ctx context.Context) {
m.workflows = make(map[string]*runningWorkflow)
m.mu.Unlock()

// Abort the running jobs. They won't save their state as
// Cancel the running jobs. They won't save their state as
// m.ctx is nil and they know it means we're shutting down.
for _, rw := range runningWorkflows {
rw.cancel()
Expand Down
2 changes: 1 addition & 1 deletion go/vt/wrangler/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ func (wr *Wrangler) SwitchWrites(ctx context.Context, targetKeyspace, workflow s
return ts.id, sw.logs(), nil
}

// DropTargets cleans up target tables, shards and blacklisted tables if a MoveTables/Reshard is aborted
// DropTargets cleans up target tables, shards and blacklisted tables if a MoveTables/Reshard is cancelled
func (wr *Wrangler) DropTargets(ctx context.Context, targetKeyspace, workflow string, keepData, dryRun bool) (*[]string, error) {
ts, err := wr.buildTrafficSwitcher(ctx, targetKeyspace, workflow)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions go/vt/wrangler/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func (vrw *VReplicationWorkflow) ReverseTraffic() error {
// Workflow errors
const (
ErrWorkflowNotFullySwitched = "cannot complete workflow because you have not yet switched all read and write traffic"
ErrWorkflowPartiallySwitched = "cannot abort workflow because you have already switched some or all read and write traffic"
ErrWorkflowPartiallySwitched = "cannot cancel workflow because you have already switched some or all read and write traffic"
)

// Complete cleans up a successful workflow
Expand All @@ -280,8 +280,8 @@ func (vrw *VReplicationWorkflow) Complete() error {
return nil
}

// Abort deletes all artifacts from a workflow which has not yet been switched
func (vrw *VReplicationWorkflow) Abort() error {
// Cancel deletes all artifacts from a workflow which has not yet been switched
func (vrw *VReplicationWorkflow) Cancel() error {
ws := vrw.ws
if ws.WritesSwitched || len(ws.ReplicaCellsSwitched) > 0 || len(ws.RdonlyCellsSwitched) > 0 {
return fmt.Errorf(ErrWorkflowPartiallySwitched)
Expand Down
10 changes: 5 additions & 5 deletions go/vt/wrangler/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestReshardingWorkflowErrorsAndMisc(t *testing.T) {
require.True(t, mtwf.Exists())
require.Errorf(t, mtwf.Complete(), ErrWorkflowNotFullySwitched)
mtwf.ws.WritesSwitched = true
require.Errorf(t, mtwf.Abort(), ErrWorkflowPartiallySwitched)
require.Errorf(t, mtwf.Cancel(), ErrWorkflowPartiallySwitched)

require.ElementsMatch(t, mtwf.getCellsAsArray(), []string{"cell1", "cell2"})
require.ElementsMatch(t, mtwf.getTabletTypes(), []topodata.TabletType{topodata.TabletType_REPLICA, topodata.TabletType_RDONLY})
Expand Down Expand Up @@ -286,7 +286,7 @@ func TestMoveTablesV2Partial(t *testing.T) {

}

func TestMoveTablesV2Abort(t *testing.T) {
func TestMoveTablesV2Cancel(t *testing.T) {
ctx := context.Background()
p := &VReplicationWorkflowParams{
Workflow: "test",
Expand All @@ -312,7 +312,7 @@ func TestMoveTablesV2Abort(t *testing.T) {
require.True(t, checkIfTableExistInVSchema(ctx, t, wf.wr.ts, "ks2", "t1"))
require.True(t, checkIfTableExistInVSchema(ctx, t, wf.wr.ts, "ks2", "t2"))

require.NoError(t, wf.Abort())
require.NoError(t, wf.Cancel())

validateRoutingRuleCount(ctx, t, wf.wr.ts, 0)

Expand Down Expand Up @@ -356,7 +356,7 @@ func TestReshardV2(t *testing.T) {
require.NotNil(t, si)
}

func TestReshardV2Abort(t *testing.T) {
func TestReshardV2Cancel(t *testing.T) {
ctx := context.Background()
sourceShards := []string{"-40", "40-"}
targetShards := []string{"-80", "80-"}
Expand All @@ -378,7 +378,7 @@ func TestReshardV2Abort(t *testing.T) {
require.Equal(t, WorkflowStateNotSwitched, wf.CurrentState())
tme.expectNoPreviousJournals()
expectReshardQueries(t, tme)
require.NoError(t, wf.Abort())
require.NoError(t, wf.Cancel())
}

func expectReshardQueries(t *testing.T, tme *testShardMigraterEnv) {
Expand Down