diff --git a/go/vt/binlog/binlogplayer/binlog_player.go b/go/vt/binlog/binlogplayer/binlog_player.go index 3e81c95c639..952a52d2fdd 100644 --- a/go/vt/binlog/binlogplayer/binlog_player.go +++ b/go/vt/binlog/binlogplayer/binlog_player.go @@ -531,6 +531,14 @@ func CreateVReplication(workflow string, source *binlogdatapb.BinlogSource, posi encodeString(workflow), encodeString(source.String()), encodeString(position), maxTPS, maxReplicationLag, timeUpdated, BlpRunning) } +// CreateVReplicationStopped returns a statement to create a stopped vreplication. +func CreateVReplicationStopped(workflow string, source *binlogdatapb.BinlogSource, position string) string { + return fmt.Sprintf("insert into _vt.vreplication "+ + "(workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state) "+ + "values (%v, %v, %v, %v, %v, %v, 0, '%v')", + encodeString(workflow), encodeString(source.String()), encodeString(position), throttler.MaxRateModuleDisabled, throttler.ReplicationLagModuleDisabled, time.Now().Unix(), BlpStopped) +} + // updateVReplicationPos returns a statement to update a value in the // _vt.vreplication table. func updateVReplicationPos(uid uint32, pos mysql.Position, timeUpdated int64, txTimestamp int64) string { diff --git a/go/vt/binlog/binlogplayer/fake_dbclient.go b/go/vt/binlog/binlogplayer/fake_dbclient.go index a2f4e091fb9..24bb2a4b043 100644 --- a/go/vt/binlog/binlogplayer/fake_dbclient.go +++ b/go/vt/binlog/binlogplayer/fake_dbclient.go @@ -76,6 +76,8 @@ func (dc *fakeDBClient) ExecuteFetch(query string, maxrows int) (qr *sqltypes.Re ), nil } return &sqltypes.Result{}, nil + case strings.HasPrefix(query, "use"): + return &sqltypes.Result{}, nil } return nil, fmt.Errorf("unexpected: %v", query) } diff --git a/go/vt/vtctl/vtctl.go b/go/vt/vtctl/vtctl.go index 9fc6b7d7d99..1ec9eb8e862 100644 --- a/go/vt/vtctl/vtctl.go +++ b/go/vt/vtctl/vtctl.go @@ -1684,6 +1684,7 @@ func commandMigrateServedTypes(ctx context.Context, wr *wrangler.Wrangler, subFl reverse := subFlags.Bool("reverse", false, "Moves the served tablet type backward instead of forward. Use in case of trouble") skipReFreshState := subFlags.Bool("skip-refresh-state", false, "Skips refreshing the state of the source tablets after the migration, meaning that the refresh will need to be done manually, replica and rdonly only)") filteredReplicationWaitTime := subFlags.Duration("filtered_replication_wait_time", 30*time.Second, "Specifies the maximum time to wait, in seconds, for filtered replication to catch up on master migrations") + reverseReplication := subFlags.Bool("reverse_replication", false, "For master migration, enabling this flag reverses replication allowing which allows you to rollback") if err := subFlags.Parse(args); err != nil { return err } @@ -1706,7 +1707,7 @@ func commandMigrateServedTypes(ctx context.Context, wr *wrangler.Wrangler, subFl if *cellsStr != "" { cells = strings.Split(*cellsStr, ",") } - return wr.MigrateServedTypes(ctx, keyspace, shard, cells, servedType, *reverse, *skipReFreshState, *filteredReplicationWaitTime) + return wr.MigrateServedTypes(ctx, keyspace, shard, cells, servedType, *reverse, *skipReFreshState, *filteredReplicationWaitTime, *reverseReplication) } func commandMigrateServedFrom(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error { diff --git a/go/vt/vttablet/tabletmanager/vreplication/engine.go b/go/vt/vttablet/tabletmanager/vreplication/engine.go index abf1b0b9efd..1b272473694 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/engine.go +++ b/go/vt/vttablet/tabletmanager/vreplication/engine.go @@ -194,6 +194,13 @@ func (vre *Engine) Exec(query string) (*sqltypes.Result, error) { vre.mustCreate = false } + // Change the database to ensure that these events don't get + // replicated by another vreplication. This can happen when + // we reverse replication. + if _, err := dbClient.ExecuteFetch("use _vt", 1); err != nil { + return nil, err + } + switch plan.opcode { case insertQuery: qr, err := dbClient.ExecuteFetch(plan.query, 1) diff --git a/go/vt/vttablet/tabletmanager/vreplication/engine_test.go b/go/vt/vttablet/tabletmanager/vreplication/engine_test.go index d10f91ff177..24c2140e5dd 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/engine_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/engine_test.go @@ -99,6 +99,7 @@ func TestEngineExec(t *testing.T) { } defer vre.Close() + dbClient.ExpectRequest("use _vt", &sqltypes.Result{}, nil) dbClient.ExpectRequest("insert into _vt.vreplication values (null)", &sqltypes.Result{InsertID: 1}, nil) dbClient.ExpectRequest("select * from _vt.vreplication where id = 1", sqltypes.MakeTestResult( sqltypes.MakeTestFields( @@ -138,6 +139,7 @@ func TestEngineExec(t *testing.T) { savedBlp := ct.blpStats + dbClient.ExpectRequest("use _vt", &sqltypes.Result{}, nil) dbClient.ExpectRequest("update _vt.vreplication set pos = 'MariaDB/0-1-1084', state = 'Running' where id = 1", testDMLResponse, nil) dbClient.ExpectRequest("select * from _vt.vreplication where id = 1", sqltypes.MakeTestResult( sqltypes.MakeTestFields( @@ -177,6 +179,7 @@ func TestEngineExec(t *testing.T) { // Test Delete + dbClient.ExpectRequest("use _vt", &sqltypes.Result{}, nil) delQuery := "delete from _vt.vreplication where id = 1" dbClient.ExpectRequest(delQuery, testDMLResponse, nil) @@ -220,6 +223,7 @@ func TestEngineBadInsert(t *testing.T) { } defer vre.Close() + dbClient.ExpectRequest("use _vt", &sqltypes.Result{}, nil) dbClient.ExpectRequest("insert into _vt.vreplication values (null)", &sqltypes.Result{}, nil) _, err := vre.Exec("insert into _vt.vreplication values(null)") want := "insert failed to generate an id" @@ -250,6 +254,7 @@ func TestEngineSelect(t *testing.T) { } defer vre.Close() + dbClient.ExpectRequest("use _vt", &sqltypes.Result{}, nil) wantQuery := "select * from _vt.vreplication where workflow = 'x'" wantResult := sqltypes.MakeTestResult( sqltypes.MakeTestFields( @@ -400,6 +405,7 @@ func TestCreateDBAndTable(t *testing.T) { dbClient.ExpectRequest("CREATE DATABASE IF NOT EXISTS _vt", &sqltypes.Result{}, nil) dbClient.ExpectRequest("DROP TABLE IF EXISTS _vt.blp_checkpoint", &sqltypes.Result{}, nil) dbClient.ExpectRequestRE("CREATE TABLE IF NOT EXISTS _vt.vreplication.*", &sqltypes.Result{}, nil) + dbClient.ExpectRequest("use _vt", &sqltypes.Result{}, nil) dbClient.ExpectRequest("insert into _vt.vreplication values (null)", &sqltypes.Result{InsertID: 1}, nil) dbClient.ExpectRequest("select * from _vt.vreplication where id = 1", sqltypes.MakeTestResult( sqltypes.MakeTestFields( diff --git a/go/vt/workflow/resharding/mock_resharding_wrangler_test.go b/go/vt/workflow/resharding/mock_resharding_wrangler_test.go index 42e735c5a3e..21b68ce7f45 100644 --- a/go/vt/workflow/resharding/mock_resharding_wrangler_test.go +++ b/go/vt/workflow/resharding/mock_resharding_wrangler_test.go @@ -61,13 +61,13 @@ func (mr *MockReshardingWranglerMockRecorder) WaitForFilteredReplication(ctx, ke } // MigrateServedTypes mocks base method -func (m *MockReshardingWrangler) MigrateServedTypes(ctx context.Context, keyspace, shard string, cells []string, servedType topodata.TabletType, reverse, skipReFreshState bool, filteredReplicationWaitTime time.Duration) error { - ret := m.ctrl.Call(m, "MigrateServedTypes", ctx, keyspace, shard, cells, servedType, reverse, skipReFreshState, filteredReplicationWaitTime) +func (m *MockReshardingWrangler) MigrateServedTypes(ctx context.Context, keyspace, shard string, cells []string, servedType topodata.TabletType, reverse, skipReFreshState bool, filteredReplicationWaitTime time.Duration, reverseReplication bool) error { + ret := m.ctrl.Call(m, "MigrateServedTypes", ctx, keyspace, shard, cells, servedType, reverse, skipReFreshState, filteredReplicationWaitTime, reverseReplication) ret0, _ := ret[0].(error) return ret0 } // MigrateServedTypes indicates an expected call of MigrateServedTypes -func (mr *MockReshardingWranglerMockRecorder) MigrateServedTypes(ctx, keyspace, shard, cells, servedType, reverse, skipReFreshState, filteredReplicationWaitTime interface{}) *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MigrateServedTypes", reflect.TypeOf((*MockReshardingWrangler)(nil).MigrateServedTypes), ctx, keyspace, shard, cells, servedType, reverse, skipReFreshState, filteredReplicationWaitTime) +func (mr *MockReshardingWranglerMockRecorder) MigrateServedTypes(ctx, keyspace, shard, cells, servedType, reverse, skipReFreshState, filteredReplicationWaitTime, reverseReplication interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MigrateServedTypes", reflect.TypeOf((*MockReshardingWrangler)(nil).MigrateServedTypes), ctx, keyspace, shard, cells, servedType, reverse, skipReFreshState, filteredReplicationWaitTime, reverseReplication) } diff --git a/go/vt/workflow/resharding/resharding_wrangler.go b/go/vt/workflow/resharding/resharding_wrangler.go index 6b988f1c6a9..c06405df2b9 100644 --- a/go/vt/workflow/resharding/resharding_wrangler.go +++ b/go/vt/workflow/resharding/resharding_wrangler.go @@ -33,5 +33,5 @@ type ReshardingWrangler interface { WaitForFilteredReplication(ctx context.Context, keyspace, shard string, maxDelay time.Duration) error - MigrateServedTypes(ctx context.Context, keyspace, shard string, cells []string, servedType topodatapb.TabletType, reverse, skipReFreshState bool, filteredReplicationWaitTime time.Duration) error + MigrateServedTypes(ctx context.Context, keyspace, shard string, cells []string, servedType topodatapb.TabletType, reverse, skipReFreshState bool, filteredReplicationWaitTime time.Duration, reverseReplication bool) error } diff --git a/go/vt/workflow/resharding/tasks.go b/go/vt/workflow/resharding/tasks.go index 61ae80e5b97..e55a8c9014a 100644 --- a/go/vt/workflow/resharding/tasks.go +++ b/go/vt/workflow/resharding/tasks.go @@ -120,5 +120,5 @@ func (hw *horizontalReshardingWorkflow) runMigrate(ctx context.Context, t *workf return fmt.Errorf("wrong served type to be migrated: %v", servedTypeStr) } - return hw.wr.MigrateServedTypes(ctx, keyspace, sourceShard, nil /* cells */, servedType, false /* reverse */, false /* skipReFreshState */, wrangler.DefaultFilteredReplicationWaitTime) + return hw.wr.MigrateServedTypes(ctx, keyspace, sourceShard, nil /* cells */, servedType, false /* reverse */, false /* skipReFreshState */, wrangler.DefaultFilteredReplicationWaitTime, true /* reverseReplication */) } diff --git a/go/vt/workflow/resharding/workflow_test.go b/go/vt/workflow/resharding/workflow_test.go index b5c384a85f1..023cfd55492 100644 --- a/go/vt/workflow/resharding/workflow_test.go +++ b/go/vt/workflow/resharding/workflow_test.go @@ -166,7 +166,7 @@ func setupMockWrangler(ctrl *gomock.Controller, keyspace string) *MockResharding topodatapb.TabletType_REPLICA, topodatapb.TabletType_MASTER} for _, servedType := range servedTypeParams { - mockWranglerInterface.EXPECT().MigrateServedTypes(gomock.Any(), keyspace, "0", nil /* cells */, servedType, false /* reverse */, false /* skipReFreshState */, wrangler.DefaultFilteredReplicationWaitTime).Return(nil) + mockWranglerInterface.EXPECT().MigrateServedTypes(gomock.Any(), keyspace, "0", nil /* cells */, servedType, false /* reverse */, false /* skipReFreshState */, wrangler.DefaultFilteredReplicationWaitTime, true /* reverseReplication */).Return(nil) } return mockWranglerInterface } diff --git a/go/vt/wrangler/keyspace.go b/go/vt/wrangler/keyspace.go index 91ef72bebfa..3c1296d4e54 100644 --- a/go/vt/wrangler/keyspace.go +++ b/go/vt/wrangler/keyspace.go @@ -33,6 +33,7 @@ import ( "vitess.io/vitess/go/vt/topotools" "vitess.io/vitess/go/vt/topotools/events" + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" vschemapb "vitess.io/vitess/go/vt/proto/vschema" ) @@ -89,7 +90,7 @@ func (wr *Wrangler) SetKeyspaceShardingInfo(ctx context.Context, keyspace, shard // MigrateServedTypes is used during horizontal splits to migrate a // served type from a list of shards to another. -func (wr *Wrangler) MigrateServedTypes(ctx context.Context, keyspace, shard string, cells []string, servedType topodatapb.TabletType, reverse, skipReFreshState bool, filteredReplicationWaitTime time.Duration) (err error) { +func (wr *Wrangler) MigrateServedTypes(ctx context.Context, keyspace, shard string, cells []string, servedType topodatapb.TabletType, reverse, skipReFreshState bool, filteredReplicationWaitTime time.Duration, reverseReplication bool) (err error) { // check input parameters if servedType == topodatapb.TabletType_MASTER { // we cannot migrate a master back, since when master migration @@ -133,7 +134,7 @@ func (wr *Wrangler) MigrateServedTypes(ctx context.Context, keyspace, shard stri // execute the migration if servedType == topodatapb.TabletType_MASTER { - if err = wr.masterMigrateServedType(ctx, keyspace, sourceShards, destinationShards, filteredReplicationWaitTime); err != nil { + if err = wr.masterMigrateServedType(ctx, keyspace, sourceShards, destinationShards, filteredReplicationWaitTime, reverseReplication); err != nil { return err } } else { @@ -179,6 +180,27 @@ func (wr *Wrangler) MigrateServedTypes(ctx context.Context, keyspace, shard stri // findSourceDest derives the source and destination from the overlapping shards. // Whichever side has SourceShards is a destination. func (wr *Wrangler) findSourceDest(ctx context.Context, os *topotools.OverlappingShards) (sourceShards, destinationShards []*topo.ShardInfo, err error) { + // It's possible that both source and destination have source shards because of reversible replication. + // If so, the Frozen flag in the tablet control record dictates the direction. + // So, check that first. + for _, left := range os.Left { + tc := left.GetTabletControl(topodatapb.TabletType_MASTER) + if tc == nil { + continue + } + if tc.Frozen { + return os.Left, os.Right, nil + } + } + for _, right := range os.Right { + tc := right.GetTabletControl(topodatapb.TabletType_MASTER) + if tc == nil { + continue + } + if tc.Frozen { + return os.Right, os.Left, nil + } + } for _, left := range os.Left { if len(left.SourceShards) != 0 { return os.Right, os.Left, nil @@ -334,7 +356,7 @@ func (wr *Wrangler) replicaMigrateServedType(ctx context.Context, keyspace strin } // masterMigrateServedType operates with the keyspace locked -func (wr *Wrangler) masterMigrateServedType(ctx context.Context, keyspace string, sourceShards, destinationShards []*topo.ShardInfo, filteredReplicationWaitTime time.Duration) (err error) { +func (wr *Wrangler) masterMigrateServedType(ctx context.Context, keyspace string, sourceShards, destinationShards []*topo.ShardInfo, filteredReplicationWaitTime time.Duration, reverseReplication bool) (err error) { // Ensure other served types have migrated. if si := sourceShards[0]; len(si.ServedTypes) > 1 { var types []string @@ -388,12 +410,18 @@ func (wr *Wrangler) masterMigrateServedType(ctx context.Context, keyspace string } // We've reached the point of no return. Freeze the tablet control records in the source masters. - if err := wr.freezeMasterMigrateServedType(ctx, sourceShards); err != nil { + if err := wr.updateFrozenFlag(ctx, sourceShards, true); err != nil { wr.cancelMasterMigrateServedTypes(ctx, sourceShards) return err } // Phase 2 + // Always setup reverse replication. We'll start it later if reverseReplication was specified. + // This will allow someone to reverse the replication later if they change their mind. + if err := wr.setupReverseReplication(ctx, sourceShards, destinationShards); err != nil { + return err + } + // Destination shards need different handling than what updateShardRecords does. event.DispatchUpdate(ev, "updating destination shards") for i, si := range destinationShards { @@ -429,6 +457,16 @@ func (wr *Wrangler) masterMigrateServedType(ctx context.Context, keyspace string return err } + if reverseReplication { + if err := wr.startReverseReplication(ctx, sourceShards); err != nil { + return err + } + // We also have to remove the frozen flag as final step. + if err := wr.updateFrozenFlag(ctx, sourceShards, false); err != nil { + return err + } + } + event.DispatchUpdate(ev, "finished") return nil } @@ -443,6 +481,82 @@ func (wr *Wrangler) cancelMasterMigrateServedTypes(ctx context.Context, sourceSh } } +func (wr *Wrangler) setupReverseReplication(ctx context.Context, sourceShards, destinationShards []*topo.ShardInfo) error { + // Retrieve master positions of all destinations. + masterPositions := make([]string, len(destinationShards)) + for i, dest := range destinationShards { + ti, err := wr.ts.GetTablet(ctx, dest.MasterAlias) + if err != nil { + return err + } + + wr.Logger().Infof("Gathering master position for %v", topoproto.TabletAliasString(dest.MasterAlias)) + masterPositions[i], err = wr.tmc.MasterPosition(ctx, ti.Tablet) + if err != nil { + return err + } + } + + // Create reverse replication for each source. + for i, sourceShard := range sourceShards { + if len(sourceShard.SourceShards) != 0 { + continue + } + // Handle the case where the source is "unsharded". + kr := sourceShard.KeyRange + if kr == nil { + kr = &topodatapb.KeyRange{} + } + // Create replications streams first using the retrieved master positions. + uids := make([]uint32, len(destinationShards)) + for j, dest := range destinationShards { + bls := &binlogdatapb.BinlogSource{ + Keyspace: dest.Keyspace(), + Shard: dest.ShardName(), + KeyRange: kr, + } + qr, err := wr.VReplicationExec(ctx, sourceShard.MasterAlias, binlogplayer.CreateVReplicationStopped("ReversedResharding", bls, masterPositions[j])) + if err != nil { + return err + } + uids[j] = uint32(qr.InsertId) + wr.Logger().Infof("Created reverse replication for tablet %v/%v: %v, pos: %v, uid: %v", sourceShard.Keyspace(), sourceShard.ShardName(), bls, masterPositions[j], uids[j]) + } + // Source shards have to be atomically added to ensure idempotence. + // If this fails, there's no harm because the unstarted vreplication streams will just be abandoned. + var err error + sourceShards[i], err = wr.ts.UpdateShardFields(ctx, sourceShard.Keyspace(), sourceShard.ShardName(), func(si *topo.ShardInfo) error { + for j, dest := range destinationShards { + si.SourceShards = append(si.SourceShards, &topodatapb.Shard_SourceShard{ + Uid: uids[j], + Keyspace: dest.Keyspace(), + Shard: dest.ShardName(), + KeyRange: dest.KeyRange, + }) + } + return nil + }) + if err != nil { + wr.Logger().Errorf("Unstarted vreplication streams for %v/%v need to be deleted: %v", sourceShard.Keyspace(), sourceShard.ShardName(), uids) + return fmt.Errorf("failed to setup reverse replication: %v, unstarted vreplication streams for %v/%v need to be deleted: %v", err, sourceShard.Keyspace(), sourceShard.ShardName(), uids) + } + } + return nil +} + +func (wr *Wrangler) startReverseReplication(ctx context.Context, sourceShards []*topo.ShardInfo) error { + for _, sourceShard := range sourceShards { + for _, dest := range sourceShard.SourceShards { + wr.Logger().Infof("Starting reverse replication for tablet %v/%v, uid: %v", sourceShard.Keyspace(), sourceShard.ShardName(), dest.Uid) + _, err := wr.VReplicationExec(ctx, sourceShard.MasterAlias, binlogplayer.StartVReplication(dest.Uid)) + if err != nil { + return err + } + } + } + return nil +} + // updateShardRecords updates the shard records based on 'from' or 'to' direction. func (wr *Wrangler) updateShardRecords(ctx context.Context, shards []*topo.ShardInfo, cells []string, servedType topodatapb.TabletType, isFrom bool) (err error) { for i, si := range shards { @@ -464,16 +578,16 @@ func (wr *Wrangler) updateShardRecords(ctx context.Context, shards []*topo.Shard return nil } -// freezeMasterMigrateServedType freezes the tablet control record for the source masters -// to prevent them from being removed. -func (wr *Wrangler) freezeMasterMigrateServedType(ctx context.Context, shards []*topo.ShardInfo) (err error) { +// updateFrozenFlag sets or unsets the Frozen flag for master migration. This is performed +// for all master tablet control records. +func (wr *Wrangler) updateFrozenFlag(ctx context.Context, shards []*topo.ShardInfo, value bool) (err error) { for i, si := range shards { shards[i], err = wr.ts.UpdateShardFields(ctx, si.Keyspace(), si.ShardName(), func(si *topo.ShardInfo) error { tc := si.GetTabletControl(topodatapb.TabletType_MASTER) if tc == nil { return fmt.Errorf("unexpected: missing tablet control record for source %v/%v", si.Keyspace(), si.ShardName()) } - tc.Frozen = true + tc.Frozen = value return nil }) if err != nil { diff --git a/go/vt/wrangler/testlib/migrate_served_from_test.go b/go/vt/wrangler/testlib/migrate_served_from_test.go index f4eba888103..bd51f1fdf0c 100644 --- a/go/vt/wrangler/testlib/migrate_served_from_test.go +++ b/go/vt/wrangler/testlib/migrate_served_from_test.go @@ -113,6 +113,7 @@ func TestMigrateServedFrom(t *testing.T) { dbClient.ExpectRequest("select pos from _vt.vreplication where id=1", &sqltypes.Result{Rows: [][]sqltypes.Value{{ sqltypes.NewVarBinary("MariaDB/5-456-892"), }}}, nil) + dbClient.ExpectRequest("use _vt", &sqltypes.Result{}, nil) dbClient.ExpectRequest("delete from _vt.vreplication where id = 1", &sqltypes.Result{RowsAffected: 1}, nil) // simulate the clone, by fixing the dest shard record diff --git a/go/vt/wrangler/testlib/migrate_served_types_test.go b/go/vt/wrangler/testlib/migrate_served_types_test.go index b1778fb3e63..eb8a872eb68 100644 --- a/go/vt/wrangler/testlib/migrate_served_types_test.go +++ b/go/vt/wrangler/testlib/migrate_served_types_test.go @@ -149,6 +149,7 @@ func TestMigrateServedTypes(t *testing.T) { dbClient1.ExpectRequest("select pos from _vt.vreplication where id=1", &sqltypes.Result{Rows: [][]sqltypes.Value{{ sqltypes.NewVarBinary("MariaDB/5-456-892"), }}}, nil) + dbClient1.ExpectRequest("use _vt", &sqltypes.Result{}, nil) dbClient1.ExpectRequest("delete from _vt.vreplication where id = 1", &sqltypes.Result{RowsAffected: 1}, nil) // dest2Rdonly will see the refresh @@ -175,6 +176,7 @@ func TestMigrateServedTypes(t *testing.T) { dbClient2.ExpectRequest("select pos from _vt.vreplication where id=1", &sqltypes.Result{Rows: [][]sqltypes.Value{{ sqltypes.NewVarBinary("MariaDB/5-456-892"), }}}, nil) + dbClient2.ExpectRequest("use _vt", &sqltypes.Result{}, nil) dbClient2.ExpectRequest("delete from _vt.vreplication where id = 1", &sqltypes.Result{RowsAffected: 1}, nil) // migrate will error if the overlapping shards have no "SourceShard" entry diff --git a/test/resharding.py b/test/resharding.py index 13488a30094..9532447539f 100755 --- a/test/resharding.py +++ b/test/resharding.py @@ -1134,7 +1134,7 @@ def test_resharding(self): auto_log=True, expect_fail=True) utils.check_tablet_query_service(self, shard_1_master, False, True) - # finally, do the Migration that's expected to succeed + # do the migration that's expected to succeed utils.run_vtctl(['MigrateServedTypes', 'test_keyspace/80-', 'master'], auto_log=True) utils.check_srv_keyspace('test_nj', 'test_keyspace', @@ -1145,6 +1145,32 @@ def test_resharding(self): sharding_column_name='custom_ksid_col') utils.check_tablet_query_service(self, shard_1_master, False, True) + # test reverse_replication + # start with inserting a row in each destination shard + self._insert_value(shard_2_master, 'resharding2', 2, 'msg2', + 0x9000000000000000) + self._insert_value(shard_3_master, 'resharding2', 3, 'msg3', + 0xD000000000000000) + # ensure the rows are not present yet + self._check_value(shard_1_master, 'resharding2', 2, 'msg2', + 0x9000000000000000, should_be_here=False) + self._check_value(shard_1_master, 'resharding2', 3, 'msg3', + 0xD000000000000000, should_be_here=False) + # repeat the migration with reverse_replication + utils.run_vtctl(['MigrateServedTypes', '-reverse_replication=true', + 'test_keyspace/80-', 'master'], auto_log=True) + # look for the rows in the original master after a short wait + time.sleep(1.0) + self._check_value(shard_1_master, 'resharding2', 2, 'msg2', + 0x9000000000000000) + self._check_value(shard_1_master, 'resharding2', 3, 'msg3', + 0xD000000000000000) + + # retry the migration to ensure it now fails + utils.run_vtctl(['MigrateServedTypes', '-reverse_replication=true', + 'test_keyspace/80-', 'master'], + auto_log=True, expect_fail=True) + # check the binlog players are gone now self.check_no_binlog_player(shard_2_master) self.check_no_binlog_player(shard_3_master)