Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
408 changes: 295 additions & 113 deletions go/vt/proto/binlogdata/binlogdata.pb.go

Large diffs are not rendered by default.

60 changes: 56 additions & 4 deletions go/vt/vtctl/vtctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,12 @@ var commands = []commandGroup{
{"MigrateServedFrom", commandMigrateServedFrom,
"[-cells=c1,c2,...] [-reverse] <destination keyspace/shard> <served tablet type>",
"Makes the <destination keyspace/shard> serve the given type. This command also rebuilds the serving graph."},
{"MigrateReads", commandMigrateReads,
"[-cells=c1,c2,...] [-reverse] <target keyspace> <workflow> <tablet type>",
"Migrate read traffic for the specified workflow."},
{"MigrateWrites", commandMigrateWrites,
"[-filtered_replication_wait_time=30s] <target keyspace> <workflow>",
"Migrate write traffic for the specified workflow."},
{"CancelResharding", commandCancelResharding,
"<keyspace/shard>",
"Permanently cancels a resharding in progress. All resharding related metadata will be deleted."},
Expand Down Expand Up @@ -1760,9 +1766,9 @@ func commandVerticalSplitClone(ctx context.Context, wr *wrangler.Wrangler, subFl

func commandMigrateServedTypes(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
cellsStr := subFlags.String("cells", "", "Specifies a comma-separated list of cells to update")
reverse := subFlags.Bool("reverse", false, "Moves the served tablet type backward instead of forward. Use in case of trouble")
reverse := subFlags.Bool("reverse", false, "Moves the served tablet type backward instead of forward.")
skipReFreshState := subFlags.Bool("skip-refresh-state", false, "Skips refreshing the state of the source tablets after the migration, meaning that the refresh will need to be done manually, replica and rdonly only)")
filteredReplicationWaitTime := subFlags.Duration("filtered_replication_wait_time", 30*time.Second, "Specifies the maximum time to wait, in seconds, for filtered replication to catch up on master migrations")
filteredReplicationWaitTime := subFlags.Duration("filtered_replication_wait_time", 30*time.Second, "Specifies the maximum time to wait, in seconds, for filtered replication to catch up on master migrations. The migration will be aborted on timeout.")
reverseReplication := subFlags.Bool("reverse_replication", false, "For master migration, enabling this flag reverses replication which allows you to rollback")
if err := subFlags.Parse(args); err != nil {
return err
Expand Down Expand Up @@ -1790,9 +1796,9 @@ func commandMigrateServedTypes(ctx context.Context, wr *wrangler.Wrangler, subFl
}

func commandMigrateServedFrom(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
reverse := subFlags.Bool("reverse", false, "Moves the served tablet type backward instead of forward. Use in case of trouble")
reverse := subFlags.Bool("reverse", false, "Moves the served tablet type backward instead of forward.")
cellsStr := subFlags.String("cells", "", "Specifies a comma-separated list of cells to update")
filteredReplicationWaitTime := subFlags.Duration("filtered_replication_wait_time", 30*time.Second, "Specifies the maximum time to wait, in seconds, for filtered replication to catch up on master migrations")
filteredReplicationWaitTime := subFlags.Duration("filtered_replication_wait_time", 30*time.Second, "Specifies the maximum time to wait, in seconds, for filtered replication to catch up on master migrations. The migration will be aborted on timeout.")
if err := subFlags.Parse(args); err != nil {
return err
}
Expand All @@ -1815,6 +1821,52 @@ func commandMigrateServedFrom(ctx context.Context, wr *wrangler.Wrangler, subFla
return wr.MigrateServedFrom(ctx, keyspace, shard, servedType, cells, *reverse, *filteredReplicationWaitTime)
}

func commandMigrateReads(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
reverse := subFlags.Bool("reverse", false, "Moves the served tablet type backward instead of forward.")
cellsStr := subFlags.String("cells", "", "Specifies a comma-separated list of cells to update")
if err := subFlags.Parse(args); err != nil {
return err
}
if subFlags.NArg() != 3 {
return fmt.Errorf("the <target keyspace/shard>, <workflow> and <tablet type> arguments are required for the MigrateReads command")
}

keyspace := subFlags.Arg(0)
workflow := subFlags.Arg(1)
servedType, err := parseTabletType(subFlags.Arg(2), []topodatapb.TabletType{topodatapb.TabletType_REPLICA, topodatapb.TabletType_RDONLY})
if err != nil {
return err
}
var cells []string
if *cellsStr != "" {
cells = strings.Split(*cellsStr, ",")
}
direction := wrangler.DirectionForward
if *reverse {
direction = wrangler.DirectionBackward
}
return wr.MigrateReads(ctx, keyspace, workflow, servedType, cells, direction)
}

func commandMigrateWrites(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
filteredReplicationWaitTime := subFlags.Duration("filtered_replication_wait_time", 30*time.Second, "Specifies the maximum time to wait, in seconds, for filtered replication to catch up on master migrations. The migration will be aborted on timeout.")
if err := subFlags.Parse(args); err != nil {
return err
}
if subFlags.NArg() != 2 {
return fmt.Errorf("the <target keyspace/shard> and <workflow> arguments are required for the MigrateWrites command")
}

keyspace := subFlags.Arg(0)
workflow := subFlags.Arg(1)
journalID, err := wr.MigrateWrites(ctx, keyspace, workflow, *filteredReplicationWaitTime)
if err != nil {
return err
}
wr.Logger().Infof("Migration Journal ID: %v", journalID)
return nil
}

func commandCancelResharding(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
if err := subFlags.Parse(args); err != nil {
return err
Expand Down
56 changes: 44 additions & 12 deletions go/vt/vttablet/tabletmanager/vreplication/controller_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ const (
updateQuery
deleteQuery
selectQuery
reshardingJournalQuery
)

// buildControllerPlan parses the input query and returns an appropriate plan.
Expand All @@ -58,15 +59,23 @@ func buildControllerPlan(query string) (*controllerPlan, error) {
}

func buildInsertPlan(ins *sqlparser.Insert) (*controllerPlan, error) {
switch sqlparser.String(ins.Table) {
case reshardingJournalTableName:
return &controllerPlan{
opcode: reshardingJournalQuery,
query: sqlparser.String(ins),
}, nil
case vreplicationTableName:
// no-op
default:
return nil, fmt.Errorf("invalid table name: %v", sqlparser.String(ins.Table))
}
if ins.Action != sqlparser.InsertStr {
return nil, fmt.Errorf("unsupported construct: %v", sqlparser.String(ins))
}
if ins.Ignore != "" {
return nil, fmt.Errorf("unsupported construct: %v", sqlparser.String(ins))
}
if sqlparser.String(ins.Table) != "_vt.vreplication" {
return nil, fmt.Errorf("invalid table name: %v", sqlparser.String(ins.Table))
}
if ins.Partitions != nil {
return nil, fmt.Errorf("unsupported construct: %v", sqlparser.String(ins))
}
Expand Down Expand Up @@ -106,7 +115,15 @@ func buildInsertPlan(ins *sqlparser.Insert) (*controllerPlan, error) {
}

func buildUpdatePlan(upd *sqlparser.Update) (*controllerPlan, error) {
if sqlparser.String(upd.TableExprs) != "_vt.vreplication" {
switch sqlparser.String(upd.TableExprs) {
case reshardingJournalTableName:
return &controllerPlan{
opcode: reshardingJournalQuery,
query: sqlparser.String(upd),
}, nil
case vreplicationTableName:
// no-op
default:
return nil, fmt.Errorf("invalid table name: %v", sqlparser.String(upd.TableExprs))
}
if upd.OrderBy != nil || upd.Limit != nil {
Expand All @@ -131,12 +148,20 @@ func buildUpdatePlan(upd *sqlparser.Update) (*controllerPlan, error) {
}

func buildDeletePlan(del *sqlparser.Delete) (*controllerPlan, error) {
switch sqlparser.String(del.TableExprs) {
case reshardingJournalTableName:
return &controllerPlan{
opcode: reshardingJournalQuery,
query: sqlparser.String(del),
}, nil
case vreplicationTableName:
// no-op
default:
return nil, fmt.Errorf("invalid table name: %v", sqlparser.String(del.TableExprs))
}
if del.Targets != nil {
return nil, fmt.Errorf("unsupported construct: %v", sqlparser.String(del))
}
if sqlparser.String(del.TableExprs) != "_vt.vreplication" {
return nil, fmt.Errorf("invalid table name: %v", sqlparser.String(del.TableExprs))
}
if del.Partitions != nil {
return nil, fmt.Errorf("unsupported construct: %v", sqlparser.String(del))
}
Expand All @@ -157,13 +182,20 @@ func buildDeletePlan(del *sqlparser.Delete) (*controllerPlan, error) {
}

func buildSelectPlan(sel *sqlparser.Select) (*controllerPlan, error) {
if sqlparser.String(sel.From) != "_vt.vreplication" {
switch sqlparser.String(sel.From) {
case reshardingJournalTableName:
return &controllerPlan{
opcode: reshardingJournalQuery,
query: sqlparser.String(sel),
}, nil
case vreplicationTableName:
return &controllerPlan{
opcode: selectQuery,
query: sqlparser.String(sel),
}, nil
default:
return nil, fmt.Errorf("invalid table name: %v", sqlparser.String(sel.From))
}
return &controllerPlan{
opcode: selectQuery,
query: sqlparser.String(sel),
}, nil
}

func extractID(where *sqlparser.Where) (int, error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ func TestControllerPlan(t *testing.T) {
in: "delete from a where id = 1",
err: "invalid table name: a",
}, {
in: "delete a, b from a where id = 1",
err: "unsupported construct: delete a, b from a where id = 1",
in: "delete a, b from _vt.vreplication where id = 1",
err: "unsupported construct: delete a, b from _vt.vreplication where id = 1",
}, {
in: "delete from _vt.vreplication where id = 1 order by id",
err: "unsupported construct: delete from _vt.vreplication where id = 1 order by id asc",
Expand Down
33 changes: 24 additions & 9 deletions go/vt/vttablet/tabletmanager/vreplication/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,17 @@ import (
"vitess.io/vitess/go/vt/topo"
)

const (
reshardingJournalTableName = "_vt.resharding_journal"
vreplicationTableName = "_vt.vreplication"
createReshardingJournalTable = `create table if not exists _vt.resharding_journal(
id bigint,
db_name varbinary(255),
val blob,
primary key (id)
) ENGINE=InnoDB`
)

var tabletTypesStr = flag.String("vreplication_tablet_type", "REPLICA", "comma separated list of tablet types used as a source")

// waitRetryTime can be changed to a smaller value for tests.
Expand Down Expand Up @@ -102,37 +113,41 @@ func (vre *Engine) Open(ctx context.Context) error {

// executeFetchMaybeCreateTable calls DBClient.ExecuteFetch and does one retry if
// there's a failure due to mysql.ERNoSuchTable or mysql.ERBadDb which can be fixed
// by re-creating the _vt.vreplication table.
// by re-creating the vreplication tables.
func (vre *Engine) executeFetchMaybeCreateTable(dbClient binlogplayer.DBClient, query string, maxrows int) (qr *sqltypes.Result, err error) {
qr, err = dbClient.ExecuteFetch(query, maxrows)

if err == nil {
return
}

// If it's a bad table or db, it could be because _vt.vreplication wasn't created.
// In that case we can try creating it again.
// If it's a bad table or db, it could be because the vreplication tables weren't created.
// In that case we can try creating them again.
merr, isSQLErr := err.(*mysql.SQLError)
if !isSQLErr || !(merr.Num == mysql.ERNoSuchTable || merr.Num == mysql.ERBadDb || merr.Num == mysql.ERBadFieldError) {
return qr, err
}

log.Info("Looks like _vt.vreplication table may not exist. Trying to recreate... ")
log.Info("Looks like the vreplcation tables may not exist. Trying to recreate... ")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot if this is this log, but there is one that as is similar to this one that in practice becomes noisy and confusing. I think we should remove and only log if there are errors.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the confusing comment a few weeks ago. It was in tablet manager. This should not spam and looks useful, because it will let us know when the tables got created.

if merr.Num == mysql.ERNoSuchTable || merr.Num == mysql.ERBadDb {
for _, query := range binlogplayer.CreateVReplicationTable() {
if _, merr := dbClient.ExecuteFetch(query, 0); merr != nil {
log.Warningf("Failed to ensure _vt.vreplication table exists: %v", merr)
log.Warningf("Failed to ensure %s exists: %v", vreplicationTableName, merr)
return nil, err
}
}
if _, merr := dbClient.ExecuteFetch(createReshardingJournalTable, 0); merr != nil {
log.Warningf("Failed to ensure %s exists: %v", reshardingJournalTableName, merr)
return nil, err
}
}
if merr.Num == mysql.ERBadFieldError {
log.Info("Adding column to table _vt.vreplication")
log.Infof("Adding column to table %s", vreplicationTableName)
for _, query := range binlogplayer.AlterVReplicationTable() {
if _, merr := dbClient.ExecuteFetch(query, 0); merr != nil {
merr, isSQLErr := err.(*mysql.SQLError)
if !isSQLErr || !(merr.Num == mysql.ERDupFieldName) {
log.Warningf("Failed to alter _vt.vreplication table: %v", merr)
log.Warningf("Failed to alter %s table: %v", vreplicationTableName, merr)
return nil, err
}
}
Expand Down Expand Up @@ -287,8 +302,8 @@ func (vre *Engine) Exec(query string) (*sqltypes.Result, error) {
delete(vre.controllers, plan.id)
}
return vre.executeFetchMaybeCreateTable(dbClient, plan.query, 1)
case selectQuery:
// select queries are passed through.
case selectQuery, reshardingJournalQuery:
// select and resharding journal queries are passed through.
return vre.executeFetchMaybeCreateTable(dbClient, plan.query, 10000)
}
panic("unreachable")
Expand Down
2 changes: 2 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ func TestCreateDBAndTable(t *testing.T) {
dbClient.ExpectRequest("CREATE DATABASE IF NOT EXISTS _vt", &sqltypes.Result{}, nil)
dbClient.ExpectRequest("DROP TABLE IF EXISTS _vt.blp_checkpoint", &sqltypes.Result{}, nil)
dbClient.ExpectRequestRE("CREATE TABLE IF NOT EXISTS _vt.vreplication.*", &sqltypes.Result{}, nil)
dbClient.ExpectRequestRE("create table if not exists _vt.resharding_journal.*", &sqltypes.Result{}, nil)
dbClient.ExpectRequest("use _vt", &sqltypes.Result{}, nil)

// Non-recoverable error.
Expand All @@ -425,6 +426,7 @@ func TestCreateDBAndTable(t *testing.T) {
dbClient.ExpectRequest("CREATE DATABASE IF NOT EXISTS _vt", &sqltypes.Result{}, nil)
dbClient.ExpectRequest("DROP TABLE IF EXISTS _vt.blp_checkpoint", &sqltypes.Result{}, nil)
dbClient.ExpectRequestRE("CREATE TABLE IF NOT EXISTS _vt.vreplication.*", &sqltypes.Result{}, nil)
dbClient.ExpectRequestRE("create table if not exists _vt.resharding_journal.*", &sqltypes.Result{}, nil)

dbClient.ExpectRequest("insert into _vt.vreplication values (null)", &sqltypes.Result{InsertID: 1}, nil)

Expand Down
Loading