Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions go/vt/binlog/binlogplayer/binlog_player.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,14 @@ func CreateVReplication(workflow string, source *binlogdatapb.BinlogSource, posi
encodeString(workflow), encodeString(source.String()), encodeString(position), maxTPS, maxReplicationLag, timeUpdated, BlpRunning)
}

// CreateVReplicationStopped returns a statement to create a stopped vreplication.
func CreateVReplicationStopped(workflow string, source *binlogdatapb.BinlogSource, position string) string {
return fmt.Sprintf("insert into _vt.vreplication "+
"(workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state) "+
"values (%v, %v, %v, %v, %v, %v, 0, '%v')",
encodeString(workflow), encodeString(source.String()), encodeString(position), throttler.MaxRateModuleDisabled, throttler.ReplicationLagModuleDisabled, time.Now().Unix(), BlpStopped)
}

// updateVReplicationPos returns a statement to update a value in the
// _vt.vreplication table.
func updateVReplicationPos(uid uint32, pos mysql.Position, timeUpdated int64, txTimestamp int64) string {
Expand Down
2 changes: 2 additions & 0 deletions go/vt/binlog/binlogplayer/fake_dbclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ func (dc *fakeDBClient) ExecuteFetch(query string, maxrows int) (qr *sqltypes.Re
), nil
}
return &sqltypes.Result{}, nil
case strings.HasPrefix(query, "use"):
return &sqltypes.Result{}, nil
}
return nil, fmt.Errorf("unexpected: %v", query)
}
3 changes: 2 additions & 1 deletion go/vt/vtctl/vtctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1684,6 +1684,7 @@ func commandMigrateServedTypes(ctx context.Context, wr *wrangler.Wrangler, subFl
reverse := subFlags.Bool("reverse", false, "Moves the served tablet type backward instead of forward. Use in case of trouble")
skipReFreshState := subFlags.Bool("skip-refresh-state", false, "Skips refreshing the state of the source tablets after the migration, meaning that the refresh will need to be done manually, replica and rdonly only)")
filteredReplicationWaitTime := subFlags.Duration("filtered_replication_wait_time", 30*time.Second, "Specifies the maximum time to wait, in seconds, for filtered replication to catch up on master migrations")
reverseReplication := subFlags.Bool("reverse_replication", false, "For master migration, enabling this flag reverses replication allowing which allows you to rollback")
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove "allowing"

if err := subFlags.Parse(args); err != nil {
return err
}
Expand All @@ -1706,7 +1707,7 @@ func commandMigrateServedTypes(ctx context.Context, wr *wrangler.Wrangler, subFl
if *cellsStr != "" {
cells = strings.Split(*cellsStr, ",")
}
return wr.MigrateServedTypes(ctx, keyspace, shard, cells, servedType, *reverse, *skipReFreshState, *filteredReplicationWaitTime)
return wr.MigrateServedTypes(ctx, keyspace, shard, cells, servedType, *reverse, *skipReFreshState, *filteredReplicationWaitTime, *reverseReplication)
}

func commandMigrateServedFrom(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
Expand Down
7 changes: 7 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,13 @@ func (vre *Engine) Exec(query string) (*sqltypes.Result, error) {
vre.mustCreate = false
}

// Change the database to ensure that these events don't get
// replicated by another vreplication. This can happen when
// we reverse replication.
if _, err := dbClient.ExecuteFetch("use _vt", 1); err != nil {
return nil, err
}

switch plan.opcode {
case insertQuery:
qr, err := dbClient.ExecuteFetch(plan.query, 1)
Expand Down
6 changes: 6 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func TestEngineExec(t *testing.T) {
}
defer vre.Close()

dbClient.ExpectRequest("use _vt", &sqltypes.Result{}, nil)
dbClient.ExpectRequest("insert into _vt.vreplication values (null)", &sqltypes.Result{InsertID: 1}, nil)
dbClient.ExpectRequest("select * from _vt.vreplication where id = 1", sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
Expand Down Expand Up @@ -138,6 +139,7 @@ func TestEngineExec(t *testing.T) {

savedBlp := ct.blpStats

dbClient.ExpectRequest("use _vt", &sqltypes.Result{}, nil)
dbClient.ExpectRequest("update _vt.vreplication set pos = 'MariaDB/0-1-1084', state = 'Running' where id = 1", testDMLResponse, nil)
dbClient.ExpectRequest("select * from _vt.vreplication where id = 1", sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
Expand Down Expand Up @@ -177,6 +179,7 @@ func TestEngineExec(t *testing.T) {

// Test Delete

dbClient.ExpectRequest("use _vt", &sqltypes.Result{}, nil)
delQuery := "delete from _vt.vreplication where id = 1"
dbClient.ExpectRequest(delQuery, testDMLResponse, nil)

Expand Down Expand Up @@ -220,6 +223,7 @@ func TestEngineBadInsert(t *testing.T) {
}
defer vre.Close()

dbClient.ExpectRequest("use _vt", &sqltypes.Result{}, nil)
dbClient.ExpectRequest("insert into _vt.vreplication values (null)", &sqltypes.Result{}, nil)
_, err := vre.Exec("insert into _vt.vreplication values(null)")
want := "insert failed to generate an id"
Expand Down Expand Up @@ -250,6 +254,7 @@ func TestEngineSelect(t *testing.T) {
}
defer vre.Close()

dbClient.ExpectRequest("use _vt", &sqltypes.Result{}, nil)
wantQuery := "select * from _vt.vreplication where workflow = 'x'"
wantResult := sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
Expand Down Expand Up @@ -400,6 +405,7 @@ func TestCreateDBAndTable(t *testing.T) {
dbClient.ExpectRequest("CREATE DATABASE IF NOT EXISTS _vt", &sqltypes.Result{}, nil)
dbClient.ExpectRequest("DROP TABLE IF EXISTS _vt.blp_checkpoint", &sqltypes.Result{}, nil)
dbClient.ExpectRequestRE("CREATE TABLE IF NOT EXISTS _vt.vreplication.*", &sqltypes.Result{}, nil)
dbClient.ExpectRequest("use _vt", &sqltypes.Result{}, nil)
dbClient.ExpectRequest("insert into _vt.vreplication values (null)", &sqltypes.Result{InsertID: 1}, nil)
dbClient.ExpectRequest("select * from _vt.vreplication where id = 1", sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
Expand Down
8 changes: 4 additions & 4 deletions go/vt/workflow/resharding/mock_resharding_wrangler_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion go/vt/workflow/resharding/resharding_wrangler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,5 @@ type ReshardingWrangler interface {

WaitForFilteredReplication(ctx context.Context, keyspace, shard string, maxDelay time.Duration) error

MigrateServedTypes(ctx context.Context, keyspace, shard string, cells []string, servedType topodatapb.TabletType, reverse, skipReFreshState bool, filteredReplicationWaitTime time.Duration) error
MigrateServedTypes(ctx context.Context, keyspace, shard string, cells []string, servedType topodatapb.TabletType, reverse, skipReFreshState bool, filteredReplicationWaitTime time.Duration, reverseReplication bool) error
}
2 changes: 1 addition & 1 deletion go/vt/workflow/resharding/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,5 +120,5 @@ func (hw *horizontalReshardingWorkflow) runMigrate(ctx context.Context, t *workf
return fmt.Errorf("wrong served type to be migrated: %v", servedTypeStr)
}

return hw.wr.MigrateServedTypes(ctx, keyspace, sourceShard, nil /* cells */, servedType, false /* reverse */, false /* skipReFreshState */, wrangler.DefaultFilteredReplicationWaitTime)
return hw.wr.MigrateServedTypes(ctx, keyspace, sourceShard, nil /* cells */, servedType, false /* reverse */, false /* skipReFreshState */, wrangler.DefaultFilteredReplicationWaitTime, true /* reverseReplication */)
}
2 changes: 1 addition & 1 deletion go/vt/workflow/resharding/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func setupMockWrangler(ctrl *gomock.Controller, keyspace string) *MockResharding
topodatapb.TabletType_REPLICA,
topodatapb.TabletType_MASTER}
for _, servedType := range servedTypeParams {
mockWranglerInterface.EXPECT().MigrateServedTypes(gomock.Any(), keyspace, "0", nil /* cells */, servedType, false /* reverse */, false /* skipReFreshState */, wrangler.DefaultFilteredReplicationWaitTime).Return(nil)
mockWranglerInterface.EXPECT().MigrateServedTypes(gomock.Any(), keyspace, "0", nil /* cells */, servedType, false /* reverse */, false /* skipReFreshState */, wrangler.DefaultFilteredReplicationWaitTime, true /* reverseReplication */).Return(nil)
}
return mockWranglerInterface
}
Expand Down
130 changes: 122 additions & 8 deletions go/vt/wrangler/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"vitess.io/vitess/go/vt/topotools"
"vitess.io/vitess/go/vt/topotools/events"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
)
Expand Down Expand Up @@ -89,7 +90,7 @@ func (wr *Wrangler) SetKeyspaceShardingInfo(ctx context.Context, keyspace, shard

// MigrateServedTypes is used during horizontal splits to migrate a
// served type from a list of shards to another.
func (wr *Wrangler) MigrateServedTypes(ctx context.Context, keyspace, shard string, cells []string, servedType topodatapb.TabletType, reverse, skipReFreshState bool, filteredReplicationWaitTime time.Duration) (err error) {
func (wr *Wrangler) MigrateServedTypes(ctx context.Context, keyspace, shard string, cells []string, servedType topodatapb.TabletType, reverse, skipReFreshState bool, filteredReplicationWaitTime time.Duration, reverseReplication bool) (err error) {
// check input parameters
if servedType == topodatapb.TabletType_MASTER {
// we cannot migrate a master back, since when master migration
Expand Down Expand Up @@ -133,7 +134,7 @@ func (wr *Wrangler) MigrateServedTypes(ctx context.Context, keyspace, shard stri

// execute the migration
if servedType == topodatapb.TabletType_MASTER {
if err = wr.masterMigrateServedType(ctx, keyspace, sourceShards, destinationShards, filteredReplicationWaitTime); err != nil {
if err = wr.masterMigrateServedType(ctx, keyspace, sourceShards, destinationShards, filteredReplicationWaitTime, reverseReplication); err != nil {
return err
}
} else {
Expand Down Expand Up @@ -179,6 +180,27 @@ func (wr *Wrangler) MigrateServedTypes(ctx context.Context, keyspace, shard stri
// findSourceDest derives the source and destination from the overlapping shards.
// Whichever side has SourceShards is a destination.
func (wr *Wrangler) findSourceDest(ctx context.Context, os *topotools.OverlappingShards) (sourceShards, destinationShards []*topo.ShardInfo, err error) {
// It's possible that both source and destination have source shards because of reversible replication.
// If so, the Frozen flag in the tablet control record dictates the direction.
// So, check that first.
for _, left := range os.Left {
tc := left.GetTabletControl(topodatapb.TabletType_MASTER)
if tc == nil {
continue
}
if tc.Frozen {
return os.Left, os.Right, nil
}
}
for _, right := range os.Right {
tc := right.GetTabletControl(topodatapb.TabletType_MASTER)
if tc == nil {
continue
}
if tc.Frozen {
return os.Right, os.Left, nil
}
}
for _, left := range os.Left {
if len(left.SourceShards) != 0 {
return os.Right, os.Left, nil
Expand Down Expand Up @@ -334,7 +356,7 @@ func (wr *Wrangler) replicaMigrateServedType(ctx context.Context, keyspace strin
}

// masterMigrateServedType operates with the keyspace locked
func (wr *Wrangler) masterMigrateServedType(ctx context.Context, keyspace string, sourceShards, destinationShards []*topo.ShardInfo, filteredReplicationWaitTime time.Duration) (err error) {
func (wr *Wrangler) masterMigrateServedType(ctx context.Context, keyspace string, sourceShards, destinationShards []*topo.ShardInfo, filteredReplicationWaitTime time.Duration, reverseReplication bool) (err error) {
// Ensure other served types have migrated.
if si := sourceShards[0]; len(si.ServedTypes) > 1 {
var types []string
Expand Down Expand Up @@ -388,12 +410,18 @@ func (wr *Wrangler) masterMigrateServedType(ctx context.Context, keyspace string
}

// We've reached the point of no return. Freeze the tablet control records in the source masters.
if err := wr.freezeMasterMigrateServedType(ctx, sourceShards); err != nil {
if err := wr.updateFrozenFlag(ctx, sourceShards, true); err != nil {
wr.cancelMasterMigrateServedTypes(ctx, sourceShards)
return err
}

// Phase 2
// Always setup reverse replication. We'll start it later if reverseReplication was specified.
// This will allow someone to reverse the replication later if they change their mind.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah so even if you don't pass the reverse_replication flag you can still start vreplication and revert the migration?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes :)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, you should't start vreplication yourself. You should just reissue MigrateServedTypes with -reverse_replication.

if err := wr.setupReverseReplication(ctx, sourceShards, destinationShards); err != nil {
return err
}

// Destination shards need different handling than what updateShardRecords does.
event.DispatchUpdate(ev, "updating destination shards")
for i, si := range destinationShards {
Expand Down Expand Up @@ -429,6 +457,16 @@ func (wr *Wrangler) masterMigrateServedType(ctx context.Context, keyspace string
return err
}

if reverseReplication {
if err := wr.startReverseReplication(ctx, sourceShards); err != nil {
return err
}
// We also have to remove the frozen flag as final step.
if err := wr.updateFrozenFlag(ctx, sourceShards, false); err != nil {
return err
}
}

event.DispatchUpdate(ev, "finished")
return nil
}
Expand All @@ -443,6 +481,82 @@ func (wr *Wrangler) cancelMasterMigrateServedTypes(ctx context.Context, sourceSh
}
}

func (wr *Wrangler) setupReverseReplication(ctx context.Context, sourceShards, destinationShards []*topo.ShardInfo) error {
// Retrieve master positions of all destinations.
masterPositions := make([]string, len(destinationShards))
for i, dest := range destinationShards {
ti, err := wr.ts.GetTablet(ctx, dest.MasterAlias)
if err != nil {
return err
}

wr.Logger().Infof("Gathering master position for %v", topoproto.TabletAliasString(dest.MasterAlias))
masterPositions[i], err = wr.tmc.MasterPosition(ctx, ti.Tablet)
if err != nil {
return err
}
}

// Create reverse replication for each source.
for i, sourceShard := range sourceShards {
if len(sourceShard.SourceShards) != 0 {
continue
}
// Handle the case where the source is "unsharded".
kr := sourceShard.KeyRange
if kr == nil {
kr = &topodatapb.KeyRange{}
}
// Create replications streams first using the retrieved master positions.
uids := make([]uint32, len(destinationShards))
for j, dest := range destinationShards {
bls := &binlogdatapb.BinlogSource{
Keyspace: dest.Keyspace(),
Shard: dest.ShardName(),
KeyRange: kr,
}
qr, err := wr.VReplicationExec(ctx, sourceShard.MasterAlias, binlogplayer.CreateVReplicationStopped("ReversedResharding", bls, masterPositions[j]))
if err != nil {
return err
}
uids[j] = uint32(qr.InsertId)
wr.Logger().Infof("Created reverse replication for tablet %v/%v: %v, pos: %v, uid: %v", sourceShard.Keyspace(), sourceShard.ShardName(), bls, masterPositions[j], uids[j])
}
// Source shards have to be atomically added to ensure idempotence.
// If this fails, there's no harm because the unstarted vreplication streams will just be abandoned.
var err error
sourceShards[i], err = wr.ts.UpdateShardFields(ctx, sourceShard.Keyspace(), sourceShard.ShardName(), func(si *topo.ShardInfo) error {
for j, dest := range destinationShards {
si.SourceShards = append(si.SourceShards, &topodatapb.Shard_SourceShard{
Uid: uids[j],
Keyspace: dest.Keyspace(),
Shard: dest.ShardName(),
KeyRange: dest.KeyRange,
})
}
return nil
})
if err != nil {
wr.Logger().Errorf("Unstarted vreplication streams for %v/%v need to be deleted: %v", sourceShard.Keyspace(), sourceShard.ShardName(), uids)
return fmt.Errorf("failed to setup reverse replication: %v, unstarted vreplication streams for %v/%v need to be deleted: %v", err, sourceShard.Keyspace(), sourceShard.ShardName(), uids)
}
}
return nil
}

func (wr *Wrangler) startReverseReplication(ctx context.Context, sourceShards []*topo.ShardInfo) error {
for _, sourceShard := range sourceShards {
for _, dest := range sourceShard.SourceShards {
wr.Logger().Infof("Starting reverse replication for tablet %v/%v, uid: %v", sourceShard.Keyspace(), sourceShard.ShardName(), dest.Uid)
_, err := wr.VReplicationExec(ctx, sourceShard.MasterAlias, binlogplayer.StartVReplication(dest.Uid))
if err != nil {
return err
}
}
}
return nil
}

// updateShardRecords updates the shard records based on 'from' or 'to' direction.
func (wr *Wrangler) updateShardRecords(ctx context.Context, shards []*topo.ShardInfo, cells []string, servedType topodatapb.TabletType, isFrom bool) (err error) {
for i, si := range shards {
Expand All @@ -464,16 +578,16 @@ func (wr *Wrangler) updateShardRecords(ctx context.Context, shards []*topo.Shard
return nil
}

// freezeMasterMigrateServedType freezes the tablet control record for the source masters
// to prevent them from being removed.
func (wr *Wrangler) freezeMasterMigrateServedType(ctx context.Context, shards []*topo.ShardInfo) (err error) {
// updateFrozenFlag sets or unsets the Frozen flag for master migration. This is performed
// for all master tablet control records.
func (wr *Wrangler) updateFrozenFlag(ctx context.Context, shards []*topo.ShardInfo, value bool) (err error) {
for i, si := range shards {
shards[i], err = wr.ts.UpdateShardFields(ctx, si.Keyspace(), si.ShardName(), func(si *topo.ShardInfo) error {
tc := si.GetTabletControl(topodatapb.TabletType_MASTER)
if tc == nil {
return fmt.Errorf("unexpected: missing tablet control record for source %v/%v", si.Keyspace(), si.ShardName())
}
tc.Frozen = true
tc.Frozen = value
return nil
})
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions go/vt/wrangler/testlib/migrate_served_from_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func TestMigrateServedFrom(t *testing.T) {
dbClient.ExpectRequest("select pos from _vt.vreplication where id=1", &sqltypes.Result{Rows: [][]sqltypes.Value{{
sqltypes.NewVarBinary("MariaDB/5-456-892"),
}}}, nil)
dbClient.ExpectRequest("use _vt", &sqltypes.Result{}, nil)
dbClient.ExpectRequest("delete from _vt.vreplication where id = 1", &sqltypes.Result{RowsAffected: 1}, nil)

// simulate the clone, by fixing the dest shard record
Expand Down
2 changes: 2 additions & 0 deletions go/vt/wrangler/testlib/migrate_served_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func TestMigrateServedTypes(t *testing.T) {
dbClient1.ExpectRequest("select pos from _vt.vreplication where id=1", &sqltypes.Result{Rows: [][]sqltypes.Value{{
sqltypes.NewVarBinary("MariaDB/5-456-892"),
}}}, nil)
dbClient1.ExpectRequest("use _vt", &sqltypes.Result{}, nil)
dbClient1.ExpectRequest("delete from _vt.vreplication where id = 1", &sqltypes.Result{RowsAffected: 1}, nil)

// dest2Rdonly will see the refresh
Expand All @@ -175,6 +176,7 @@ func TestMigrateServedTypes(t *testing.T) {
dbClient2.ExpectRequest("select pos from _vt.vreplication where id=1", &sqltypes.Result{Rows: [][]sqltypes.Value{{
sqltypes.NewVarBinary("MariaDB/5-456-892"),
}}}, nil)
dbClient2.ExpectRequest("use _vt", &sqltypes.Result{}, nil)
dbClient2.ExpectRequest("delete from _vt.vreplication where id = 1", &sqltypes.Result{RowsAffected: 1}, nil)

// migrate will error if the overlapping shards have no "SourceShard" entry
Expand Down
Loading