diff --git a/go/cmd/vtctldclient/command/vreplication/reshard/reshard.go b/go/cmd/vtctldclient/command/vreplication/reshard/reshard.go index 4b266dbb370..84d2eec1383 100644 --- a/go/cmd/vtctldclient/command/vreplication/reshard/reshard.go +++ b/go/cmd/vtctldclient/command/vreplication/reshard/reshard.go @@ -57,7 +57,10 @@ func registerReshardCommands(root *cobra.Command) { reshard.AddCommand(reverseTrafficCommand) reshard.AddCommand(common.GetCompleteCommand(opts)) - reshard.AddCommand(common.GetCancelCommand(opts)) + + cancel := common.GetCancelCommand(opts) + cancel.Flags().BoolVar(&common.CancelOptions.KeepData, "keep-data", false, "Keep the partially copied table data from the Reshard workflow in the target shards.") + reshard.AddCommand(cancel) } func init() { diff --git a/go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go b/go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go index 6cce3fe9fa6..6378de9bdff 100644 --- a/go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go +++ b/go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go @@ -127,6 +127,97 @@ func TestVtctldclientCLI(t *testing.T) { splitShard(t, targetKeyspaceName, reshardWorkflowName, sourceShard, newShards, tablets) }) + + t.Run("Reshard Cancel", func(t *testing.T) { + cell := vc.Cells["zone1"] + targetKeyspace := cell.Keyspaces[targetKeyspaceName] + sourceShard := "80-" + newShards := "80-c0,c0-" + require.NoError(t, vc.AddShards(t, []*Cell{cell}, targetKeyspace, newShards, 1, 0, 600, nil)) + reshardWorkflowName := "reshard" + + tablets := map[string]*cluster.VttabletProcess{ + "80-c0": targetKeyspace.Shards["80-c0"].Tablets["zone1-600"].Vttablet, + "c0-": targetKeyspace.Shards["c0-"].Tablets["zone1-700"].Vttablet, + } + + sourceReplicaTab = vc.Cells["zone1"].Keyspaces[targetKeyspaceName].Shards["80-"].Tablets["zone1-301"].Vttablet + require.NotNil(t, sourceReplicaTab) + sourceTab = vc.Cells["zone1"].Keyspaces[targetKeyspaceName].Shards["80-"].Tablets["zone1-300"].Vttablet + require.NotNil(t, sourceTab) + + targetTab1 = tablets["80-c0"] + require.NotNil(t, targetTab1) + targetTab2 = tablets["c0-"] + require.NotNil(t, targetTab2) + targetReplicaTab1 = vc.Cells["zone1"].Keyspaces[targetKeyspaceName].Shards["80-c0"].Tablets["zone1-601"].Vttablet + require.NotNil(t, targetReplicaTab1) + + overrides := map[string]string{ + "vreplication_copy_phase_duration": "10h11m12s", + "vreplication_experimental_flags": "7", + "vreplication-parallel-insert-workers": "4", + "vreplication_net_read_timeout": "6000", + "relay_log_max_items": "10000", + } + createFlags := []string{"--auto-start=false", "--defer-secondary-keys=false", + "--on-ddl", "STOP", "--tablet-types", "primary,rdonly", "--tablet-types-in-preference-order=true", + "--all-cells", "--format=json", + "--config-overrides", mapToCSV(overrides), + } + + rs := newReshard(vc, &reshardWorkflow{ + workflowInfo: &workflowInfo{ + vc: vc, + workflowName: reshardWorkflowName, + targetKeyspace: targetKeyspaceName, + }, + sourceShards: sourceShard, + targetShards: newShards, + createFlags: createFlags, + }, workflowFlavorVtctld) + + rs.Create() + + resp := getReshardResponse(rs) + require.NotNil(vc.t, resp) + require.NotNil(vc.t, resp.ShardStreams) + require.Equal(vc.t, len(resp.ShardStreams), 2) + keyspace := "customer" + for _, shard := range []string{"80-c0", "c0-"} { + streams := resp.ShardStreams[fmt.Sprintf("%s/%s", keyspace, shard)] + require.Equal(vc.t, 1, len(streams.Streams)) + require.Equal(vc.t, binlogdatapb.VReplicationWorkflowState_Stopped.String(), streams.Streams[0].Status) + } + + rs.Start() + waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", keyspace, workflowName), binlogdatapb.VReplicationWorkflowState_Running.String()) + + res, err := targetTab1.QueryTablet("show tables", keyspace, true) + require.NoError(t, err) + require.NotNil(t, res) + require.NotEmpty(t, res.Rows) + + res, err = targetTab2.QueryTablet("show tables", keyspace, true) + require.NoError(t, err) + require.NotNil(t, res) + require.NotEmpty(t, res.Rows) + + rs.Cancel() + + workflowNames := workflowList(keyspace) + require.Empty(t, workflowNames) + + res, err = targetTab1.QueryTablet("show tables", keyspace, true) + require.NoError(t, err) + require.NotNil(t, res) + require.Empty(t, res.Rows) + + res, err = targetTab2.QueryTablet("show tables", keyspace, true) + require.NoError(t, err) + require.NotNil(t, res) + require.Empty(t, res.Rows) + }) } // Tests several create flags and some complete flags and validates that some of them are set correctly for the workflow. diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index a807585adc6..e37a6bf1504 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -1990,7 +1990,7 @@ func (s *Server) dropTargets(ctx context.Context, ts *trafficSwitcher, keepData, return nil, err } case binlogdatapb.MigrationType_SHARDS: - if err := sw.dropTargetShards(ctx); err != nil { + if err := sw.removeTargetTables(ctx); err != nil { return nil, err } } diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go index 2fe89a8a69c..db4df2ceb4d 100644 --- a/go/vt/vtctl/workflow/traffic_switcher.go +++ b/go/vt/vtctl/workflow/traffic_switcher.go @@ -36,10 +36,13 @@ import ( "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/logutil" + "vitess.io/vitess/go/vt/mysqlctl/tmutils" + "vitess.io/vitess/go/vt/schema" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/topotools" + "vitess.io/vitess/go/vt/vtctl/schematools" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vtgate/vindexes" "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" @@ -1253,47 +1256,148 @@ func (ts *trafficSwitcher) dropSourceReverseVReplicationStreams(ctx context.Cont } func (ts *trafficSwitcher) removeTargetTables(ctx context.Context) error { - err := ts.ForAllTargets(func(target *MigrationTarget) error { - ts.Logger().Infof("ForAllTargets: %+v", target) - for _, tableName := range ts.Tables() { - primaryDbName, err := sqlescape.EnsureEscaped(target.GetPrimary().DbName()) - if err != nil { - return err + switch ts.MigrationType() { + case binlogdatapb.MigrationType_TABLES: + err := ts.ForAllTargets(func(target *MigrationTarget) error { + ts.Logger().Infof("ForAllTargets: %+v", target) + for _, tableName := range ts.Tables() { + primaryDbName, err := sqlescape.EnsureEscaped(target.GetPrimary().DbName()) + if err != nil { + return err + } + tableName, err := sqlescape.EnsureEscaped(tableName) + if err != nil { + return err + } + query := fmt.Sprintf("drop table %s.%s", primaryDbName, tableName) + ts.Logger().Infof("%s: Dropping table %s.%s\n", + topoproto.TabletAliasString(target.GetPrimary().GetAlias()), target.GetPrimary().DbName(), tableName) + res, err := ts.ws.tmc.ExecuteFetchAsDba(ctx, target.GetPrimary().Tablet, false, &tabletmanagerdatapb.ExecuteFetchAsDbaRequest{ + Query: []byte(query), + MaxRows: 1, + ReloadSchema: true, + DisableForeignKeyChecks: true, + }) + ts.Logger().Infof("Removed target table with result: %+v", res) + if err != nil { + if IsTableDidNotExistError(err) { + // The table was already gone, so we can ignore the error. + ts.Logger().Warningf("%s: Table %s did not exist when attempting to remove it", topoproto.TabletAliasString(target.GetPrimary().GetAlias()), tableName) + } else { + ts.Logger().Errorf("%s: Error removing table %s: %v", topoproto.TabletAliasString(target.GetPrimary().GetAlias()), tableName, err) + return err + } + } + ts.Logger().Infof("%s: Removed table %s.%s\n", + topoproto.TabletAliasString(target.GetPrimary().GetAlias()), target.GetPrimary().DbName(), tableName) + } - tableName, err := sqlescape.EnsureEscaped(tableName) + return nil + }) + if err != nil { + return err + } + + // Remove the tables from the vschema. + return ts.dropParticipatingTablesFromKeyspace(ctx, ts.TargetKeyspaceName()) + + case binlogdatapb.MigrationType_SHARDS: + // For reshard streams, do the following: + // * get the schema definition from one of the source primaries to + // determine which tables to drop. + // * drop the tables on each of the target shard's primaries + // * do not remove the tables from the vschema + oneSource := ts.SourceShards()[0].PrimaryAlias + + // Get the schema definition from the target primary. We only want to drop tables + // that match the vreplication filters. + req := &tabletmanagerdatapb.GetSchemaRequest{Tables: ts.Tables(), ExcludeTables: nil, IncludeViews: false} + sd, err := schematools.GetSchema(ctx, ts.TopoServer(), ts.ws.tmc, oneSource, req) + if err != nil { + return err + } + + err = ts.ForAllTargets(func(target *MigrationTarget) error { + primaryDbName, err := sqlescape.EnsureEscaped(target.GetPrimary().DbName()) if err != nil { return err } - query := fmt.Sprintf("drop table %s.%s", primaryDbName, tableName) - ts.Logger().Infof("%s: Dropping table %s.%s\n", - topoproto.TabletAliasString(target.GetPrimary().GetAlias()), target.GetPrimary().DbName(), tableName) - res, err := ts.ws.tmc.ExecuteFetchAsDba(ctx, target.GetPrimary().Tablet, false, &tabletmanagerdatapb.ExecuteFetchAsDbaRequest{ - Query: []byte(query), - MaxRows: 1, - ReloadSchema: true, - DisableForeignKeyChecks: true, - }) - ts.Logger().Infof("Removed target table with result: %+v", res) - if err != nil { - if IsTableDidNotExistError(err) { - // The table was already gone, so we can ignore the error. - ts.Logger().Warningf("%s: Table %s did not exist when attempting to remove it", topoproto.TabletAliasString(target.GetPrimary().GetAlias()), tableName) - } else { - ts.Logger().Errorf("%s: Error removing table %s: %v", topoproto.TabletAliasString(target.GetPrimary().GetAlias()), tableName, err) + + for _, td := range sd.TableDefinitions { + if schema.IsInternalOperationTableName(td.Name) { + continue + } + + tableName, err := sqlescape.EnsureEscaped(td.Name) + if err != nil { return err } + + var query string + + if td.Type == tmutils.TableView { + query = fmt.Sprintf("drop view %s.%s", primaryDbName, tableName) + ts.Logger().Infof("%s: Dropping view %s.%s\n", + topoproto.TabletAliasString(target.GetPrimary().GetAlias()), target.GetPrimary().DbName(), tableName) + + res, err := ts.ws.tmc.ExecuteFetchAsDba(ctx, target.GetPrimary().Tablet, false, &tabletmanagerdatapb.ExecuteFetchAsDbaRequest{ + Query: []byte(query), + MaxRows: 1, + ReloadSchema: true, + DisableForeignKeyChecks: true, + }) + + ts.Logger().Infof("Removed target view with result: %+v", res) + if err != nil { + if IsTableDidNotExistError(err) { + // The view was already gone, so we can ignore the error. + ts.Logger().Warningf("%s: view %s did not exist when attempting to remove it", topoproto.TabletAliasString(target.GetPrimary().GetAlias()), tableName) + } else { + ts.Logger().Errorf("%s: Error removing view %s: %v", topoproto.TabletAliasString(target.GetPrimary().GetAlias()), tableName, err) + return err + } + } + ts.Logger().Infof("%s: Removed view %s.%s\n", + topoproto.TabletAliasString(target.GetPrimary().GetAlias()), target.GetPrimary().DbName(), tableName) + + } else { + query = fmt.Sprintf("drop table %s.%s", primaryDbName, tableName) + ts.Logger().Infof("%s: Dropping table %s.%s\n", + topoproto.TabletAliasString(target.GetPrimary().GetAlias()), target.GetPrimary().DbName(), tableName) + + res, err := ts.ws.tmc.ExecuteFetchAsDba(ctx, target.GetPrimary().Tablet, false, &tabletmanagerdatapb.ExecuteFetchAsDbaRequest{ + Query: []byte(query), + MaxRows: 1, + ReloadSchema: true, + DisableForeignKeyChecks: true, + }) + + ts.Logger().Infof("Removed target table with result: %+v", res) + if err != nil { + if IsTableDidNotExistError(err) { + // The table was already gone, so we can ignore the error. + ts.Logger().Warningf("%s: Table %s did not exist when attempting to remove it", topoproto.TabletAliasString(target.GetPrimary().GetAlias()), tableName) + } else { + ts.Logger().Errorf("%s: Error removing table %s: %v", topoproto.TabletAliasString(target.GetPrimary().GetAlias()), tableName, err) + return err + } + } + ts.Logger().Infof("%s: Removed table %s.%s\n", + topoproto.TabletAliasString(target.GetPrimary().GetAlias()), target.GetPrimary().DbName(), tableName) + } } - ts.Logger().Infof("%s: Removed table %s.%s\n", - topoproto.TabletAliasString(target.GetPrimary().GetAlias()), target.GetPrimary().DbName(), tableName) + return nil + }) + + if err != nil { + return err } - return nil - }) - if err != nil { - return err + default: + return fmt.Errorf("unknown migration type: %v", ts.MigrationType()) } - return ts.dropParticipatingTablesFromKeyspace(ctx, ts.TargetKeyspaceName()) + return nil } func (ts *trafficSwitcher) dropTargetShards(ctx context.Context) error {