Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@ func registerReshardCommands(root *cobra.Command) {
reshard.AddCommand(reverseTrafficCommand)

reshard.AddCommand(common.GetCompleteCommand(opts))
reshard.AddCommand(common.GetCancelCommand(opts))

cancel := common.GetCancelCommand(opts)
cancel.Flags().BoolVar(&common.CancelOptions.KeepData, "keep-data", false, "Keep the partially copied table data from the Reshard workflow in the target shards.")
reshard.AddCommand(cancel)
}

func init() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,97 @@ func TestVtctldclientCLI(t *testing.T) {

splitShard(t, targetKeyspaceName, reshardWorkflowName, sourceShard, newShards, tablets)
})

t.Run("Reshard Cancel", func(t *testing.T) {
cell := vc.Cells["zone1"]
targetKeyspace := cell.Keyspaces[targetKeyspaceName]
sourceShard := "80-"
newShards := "80-c0,c0-"
require.NoError(t, vc.AddShards(t, []*Cell{cell}, targetKeyspace, newShards, 1, 0, 600, nil))
reshardWorkflowName := "reshard"

tablets := map[string]*cluster.VttabletProcess{
"80-c0": targetKeyspace.Shards["80-c0"].Tablets["zone1-600"].Vttablet,
"c0-": targetKeyspace.Shards["c0-"].Tablets["zone1-700"].Vttablet,
}

sourceReplicaTab = vc.Cells["zone1"].Keyspaces[targetKeyspaceName].Shards["80-"].Tablets["zone1-301"].Vttablet
require.NotNil(t, sourceReplicaTab)
sourceTab = vc.Cells["zone1"].Keyspaces[targetKeyspaceName].Shards["80-"].Tablets["zone1-300"].Vttablet
require.NotNil(t, sourceTab)

targetTab1 = tablets["80-c0"]
require.NotNil(t, targetTab1)
targetTab2 = tablets["c0-"]
require.NotNil(t, targetTab2)
targetReplicaTab1 = vc.Cells["zone1"].Keyspaces[targetKeyspaceName].Shards["80-c0"].Tablets["zone1-601"].Vttablet
require.NotNil(t, targetReplicaTab1)

overrides := map[string]string{
"vreplication_copy_phase_duration": "10h11m12s",
"vreplication_experimental_flags": "7",
"vreplication-parallel-insert-workers": "4",
"vreplication_net_read_timeout": "6000",
"relay_log_max_items": "10000",
}
createFlags := []string{"--auto-start=false", "--defer-secondary-keys=false",
"--on-ddl", "STOP", "--tablet-types", "primary,rdonly", "--tablet-types-in-preference-order=true",
"--all-cells", "--format=json",
"--config-overrides", mapToCSV(overrides),
}

rs := newReshard(vc, &reshardWorkflow{
workflowInfo: &workflowInfo{
vc: vc,
workflowName: reshardWorkflowName,
targetKeyspace: targetKeyspaceName,
},
sourceShards: sourceShard,
targetShards: newShards,
createFlags: createFlags,
}, workflowFlavorVtctld)

rs.Create()

resp := getReshardResponse(rs)
require.NotNil(vc.t, resp)
require.NotNil(vc.t, resp.ShardStreams)
require.Equal(vc.t, len(resp.ShardStreams), 2)
keyspace := "customer"
for _, shard := range []string{"80-c0", "c0-"} {
streams := resp.ShardStreams[fmt.Sprintf("%s/%s", keyspace, shard)]
require.Equal(vc.t, 1, len(streams.Streams))
require.Equal(vc.t, binlogdatapb.VReplicationWorkflowState_Stopped.String(), streams.Streams[0].Status)
}

rs.Start()
waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", keyspace, workflowName), binlogdatapb.VReplicationWorkflowState_Running.String())

res, err := targetTab1.QueryTablet("show tables", keyspace, true)
require.NoError(t, err)
require.NotNil(t, res)
require.NotEmpty(t, res.Rows)

res, err = targetTab2.QueryTablet("show tables", keyspace, true)
require.NoError(t, err)
require.NotNil(t, res)
require.NotEmpty(t, res.Rows)

rs.Cancel()

workflowNames := workflowList(keyspace)
require.Empty(t, workflowNames)

res, err = targetTab1.QueryTablet("show tables", keyspace, true)
require.NoError(t, err)
require.NotNil(t, res)
require.Empty(t, res.Rows)

res, err = targetTab2.QueryTablet("show tables", keyspace, true)
require.NoError(t, err)
require.NotNil(t, res)
require.Empty(t, res.Rows)
})
}

// Tests several create flags and some complete flags and validates that some of them are set correctly for the workflow.
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1990,7 +1990,7 @@ func (s *Server) dropTargets(ctx context.Context, ts *trafficSwitcher, keepData,
return nil, err
}
case binlogdatapb.MigrationType_SHARDS:
if err := sw.dropTargetShards(ctx); err != nil {
if err := sw.removeTargetTables(ctx); err != nil {
return nil, err
}
}
Expand Down
164 changes: 134 additions & 30 deletions go/vt/vtctl/workflow/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,13 @@ import (
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/mysqlctl/tmutils"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/topotools"
"vitess.io/vitess/go/vt/vtctl/schematools"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/vindexes"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication"
Expand Down Expand Up @@ -1253,47 +1256,148 @@ func (ts *trafficSwitcher) dropSourceReverseVReplicationStreams(ctx context.Cont
}

func (ts *trafficSwitcher) removeTargetTables(ctx context.Context) error {
err := ts.ForAllTargets(func(target *MigrationTarget) error {
ts.Logger().Infof("ForAllTargets: %+v", target)
for _, tableName := range ts.Tables() {
primaryDbName, err := sqlescape.EnsureEscaped(target.GetPrimary().DbName())
if err != nil {
return err
switch ts.MigrationType() {
case binlogdatapb.MigrationType_TABLES:
err := ts.ForAllTargets(func(target *MigrationTarget) error {
ts.Logger().Infof("ForAllTargets: %+v", target)
for _, tableName := range ts.Tables() {
primaryDbName, err := sqlescape.EnsureEscaped(target.GetPrimary().DbName())
if err != nil {
return err
}
tableName, err := sqlescape.EnsureEscaped(tableName)
if err != nil {
return err
}
query := fmt.Sprintf("drop table %s.%s", primaryDbName, tableName)
ts.Logger().Infof("%s: Dropping table %s.%s\n",
topoproto.TabletAliasString(target.GetPrimary().GetAlias()), target.GetPrimary().DbName(), tableName)
res, err := ts.ws.tmc.ExecuteFetchAsDba(ctx, target.GetPrimary().Tablet, false, &tabletmanagerdatapb.ExecuteFetchAsDbaRequest{
Query: []byte(query),
MaxRows: 1,
ReloadSchema: true,
DisableForeignKeyChecks: true,
})
ts.Logger().Infof("Removed target table with result: %+v", res)
if err != nil {
if IsTableDidNotExistError(err) {
// The table was already gone, so we can ignore the error.
ts.Logger().Warningf("%s: Table %s did not exist when attempting to remove it", topoproto.TabletAliasString(target.GetPrimary().GetAlias()), tableName)
} else {
ts.Logger().Errorf("%s: Error removing table %s: %v", topoproto.TabletAliasString(target.GetPrimary().GetAlias()), tableName, err)
return err
}
}
ts.Logger().Infof("%s: Removed table %s.%s\n",
topoproto.TabletAliasString(target.GetPrimary().GetAlias()), target.GetPrimary().DbName(), tableName)

}
tableName, err := sqlescape.EnsureEscaped(tableName)
return nil
})
if err != nil {
return err
}

// Remove the tables from the vschema.
return ts.dropParticipatingTablesFromKeyspace(ctx, ts.TargetKeyspaceName())

case binlogdatapb.MigrationType_SHARDS:
// For reshard streams, do the following:
// * get the schema definition from one of the source primaries to
// determine which tables to drop.
// * drop the tables on each of the target shard's primaries
// * do not remove the tables from the vschema
oneSource := ts.SourceShards()[0].PrimaryAlias

// Get the schema definition from the target primary. We only want to drop tables
// that match the vreplication filters.
req := &tabletmanagerdatapb.GetSchemaRequest{Tables: ts.Tables(), ExcludeTables: nil, IncludeViews: false}
sd, err := schematools.GetSchema(ctx, ts.TopoServer(), ts.ws.tmc, oneSource, req)
if err != nil {
return err
}

err = ts.ForAllTargets(func(target *MigrationTarget) error {
primaryDbName, err := sqlescape.EnsureEscaped(target.GetPrimary().DbName())
if err != nil {
return err
}
query := fmt.Sprintf("drop table %s.%s", primaryDbName, tableName)
ts.Logger().Infof("%s: Dropping table %s.%s\n",
topoproto.TabletAliasString(target.GetPrimary().GetAlias()), target.GetPrimary().DbName(), tableName)
res, err := ts.ws.tmc.ExecuteFetchAsDba(ctx, target.GetPrimary().Tablet, false, &tabletmanagerdatapb.ExecuteFetchAsDbaRequest{
Query: []byte(query),
MaxRows: 1,
ReloadSchema: true,
DisableForeignKeyChecks: true,
})
ts.Logger().Infof("Removed target table with result: %+v", res)
if err != nil {
if IsTableDidNotExistError(err) {
// The table was already gone, so we can ignore the error.
ts.Logger().Warningf("%s: Table %s did not exist when attempting to remove it", topoproto.TabletAliasString(target.GetPrimary().GetAlias()), tableName)
} else {
ts.Logger().Errorf("%s: Error removing table %s: %v", topoproto.TabletAliasString(target.GetPrimary().GetAlias()), tableName, err)

for _, td := range sd.TableDefinitions {
if schema.IsInternalOperationTableName(td.Name) {
continue
}

tableName, err := sqlescape.EnsureEscaped(td.Name)
if err != nil {
return err
}

var query string

if td.Type == tmutils.TableView {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can simplify the code by extracting the code from the two clauses here and the clause for MoveTables, into a common utility function, since they are very similar.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Going to merge it since we have a code freeze coming up. We can refactor later.

query = fmt.Sprintf("drop view %s.%s", primaryDbName, tableName)
ts.Logger().Infof("%s: Dropping view %s.%s\n",
topoproto.TabletAliasString(target.GetPrimary().GetAlias()), target.GetPrimary().DbName(), tableName)

res, err := ts.ws.tmc.ExecuteFetchAsDba(ctx, target.GetPrimary().Tablet, false, &tabletmanagerdatapb.ExecuteFetchAsDbaRequest{
Query: []byte(query),
MaxRows: 1,
ReloadSchema: true,
DisableForeignKeyChecks: true,
})

ts.Logger().Infof("Removed target view with result: %+v", res)
if err != nil {
if IsTableDidNotExistError(err) {
// The view was already gone, so we can ignore the error.
ts.Logger().Warningf("%s: view %s did not exist when attempting to remove it", topoproto.TabletAliasString(target.GetPrimary().GetAlias()), tableName)
} else {
ts.Logger().Errorf("%s: Error removing view %s: %v", topoproto.TabletAliasString(target.GetPrimary().GetAlias()), tableName, err)
return err
}
}
ts.Logger().Infof("%s: Removed view %s.%s\n",
topoproto.TabletAliasString(target.GetPrimary().GetAlias()), target.GetPrimary().DbName(), tableName)

} else {
query = fmt.Sprintf("drop table %s.%s", primaryDbName, tableName)
ts.Logger().Infof("%s: Dropping table %s.%s\n",
topoproto.TabletAliasString(target.GetPrimary().GetAlias()), target.GetPrimary().DbName(), tableName)

res, err := ts.ws.tmc.ExecuteFetchAsDba(ctx, target.GetPrimary().Tablet, false, &tabletmanagerdatapb.ExecuteFetchAsDbaRequest{
Query: []byte(query),
MaxRows: 1,
ReloadSchema: true,
DisableForeignKeyChecks: true,
})

ts.Logger().Infof("Removed target table with result: %+v", res)
if err != nil {
if IsTableDidNotExistError(err) {
// The table was already gone, so we can ignore the error.
ts.Logger().Warningf("%s: Table %s did not exist when attempting to remove it", topoproto.TabletAliasString(target.GetPrimary().GetAlias()), tableName)
} else {
ts.Logger().Errorf("%s: Error removing table %s: %v", topoproto.TabletAliasString(target.GetPrimary().GetAlias()), tableName, err)
return err
}
}
ts.Logger().Infof("%s: Removed table %s.%s\n",
topoproto.TabletAliasString(target.GetPrimary().GetAlias()), target.GetPrimary().DbName(), tableName)
}
}
ts.Logger().Infof("%s: Removed table %s.%s\n",
topoproto.TabletAliasString(target.GetPrimary().GetAlias()), target.GetPrimary().DbName(), tableName)

return nil
})

if err != nil {
return err
}
return nil
})
if err != nil {
return err
default:
return fmt.Errorf("unknown migration type: %v", ts.MigrationType())
}

return ts.dropParticipatingTablesFromKeyspace(ctx, ts.TargetKeyspaceName())
return nil
}

func (ts *trafficSwitcher) dropTargetShards(ctx context.Context) error {
Expand Down
Loading