diff --git a/go/vt/vtctl/vtctl.go b/go/vt/vtctl/vtctl.go index 128799fb2b1..4ed57f6cf0c 100644 --- a/go/vt/vtctl/vtctl.go +++ b/go/vt/vtctl/vtctl.go @@ -327,7 +327,7 @@ var commands = []commandGroup{ "[-cells=c1,c2,...] [-reverse] -workflow=workflow ", "Migrate read traffic for the specified workflow."}, {"MigrateWrites", commandMigrateWrites, - "[-filtered_replication_wait_time=30s] -workflow=workflow ", + "[-filtered_replication_wait_time=30s] [-reverse_replication=] -workflow=workflow ", "Migrate write traffic for the specified workflow."}, {"CancelResharding", commandCancelResharding, "", @@ -1919,6 +1919,7 @@ func commandMigrateReads(ctx context.Context, wr *wrangler.Wrangler, subFlags *f func commandMigrateWrites(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error { filteredReplicationWaitTime := subFlags.Duration("filtered_replication_wait_time", 30*time.Second, "Specifies the maximum time to wait, in seconds, for filtered replication to catch up on master migrations. The migration will be aborted on timeout.") + reverseReplication := subFlags.Bool("reverse_replication", true, "Also reverse the replication") workflow := subFlags.String("workflow", "", "Specifies the workflow name") if err := subFlags.Parse(args); err != nil { return err @@ -1931,7 +1932,7 @@ func commandMigrateWrites(ctx context.Context, wr *wrangler.Wrangler, subFlags * if *workflow == "" { return fmt.Errorf("a -workflow=workflow argument is required") } - journalID, err := wr.MigrateWrites(ctx, keyspace, *workflow, *filteredReplicationWaitTime) + journalID, err := wr.MigrateWrites(ctx, keyspace, *workflow, *filteredReplicationWaitTime, *reverseReplication) if err != nil { return err } diff --git a/go/vt/wrangler/migrater.go b/go/vt/wrangler/migrater.go index 0d73c731e95..9addbda942f 100644 --- a/go/vt/wrangler/migrater.go +++ b/go/vt/wrangler/migrater.go @@ -41,6 +41,10 @@ import ( "vitess.io/vitess/go/vt/vtgate/vindexes" ) +const ( + frozenStr = "FROZEN" +) + // MigrateDirection specifies the migration direction. type MigrateDirection int @@ -61,9 +65,13 @@ const ( // migrater contains the metadata for migrating read and write traffic // for vreplication streams. type migrater struct { - migrationType binlogdatapb.MigrationType - wr *Wrangler - workflow string + migrationType binlogdatapb.MigrationType + wr *Wrangler + workflow string + + // if frozen is true, the rest of the fields are not set. + frozen bool + reverseWorkflow string id int64 sources map[string]*miSource targets map[string]*miTarget @@ -71,7 +79,6 @@ type migrater struct { targetKeyspace string tables []string sourceKSSchema *vindexes.KeyspaceSchema - sourceWorkflows []string } // miTarget contains the metadata for each migration target. @@ -100,6 +107,9 @@ func (wr *Wrangler) MigrateReads(ctx context.Context, targetKeyspace, workflow s wr.Logger().Errorf("buildMigrater failed: %v", err) return err } + if mi.frozen { + return fmt.Errorf("cannot migrate reads while MigrateWrites is in progress") + } if err := mi.validate(ctx, false /* isWrite */); err != nil { mi.wr.Logger().Errorf("validate failed: %v", err) return err @@ -128,12 +138,21 @@ func (wr *Wrangler) MigrateReads(ctx context.Context, targetKeyspace, workflow s } // MigrateWrites is a generic way of migrating write traffic for a resharding workflow. -func (wr *Wrangler) MigrateWrites(ctx context.Context, targetKeyspace, workflow string, filteredReplicationWaitTime time.Duration) (journalID int64, err error) { +func (wr *Wrangler) MigrateWrites(ctx context.Context, targetKeyspace, workflow string, filteredReplicationWaitTime time.Duration, reverseReplication bool) (journalID int64, err error) { mi, err := wr.buildMigrater(ctx, targetKeyspace, workflow) if err != nil { wr.Logger().Errorf("buildMigrater failed: %v", err) return 0, err } + if mi.frozen { + mi.wr.Logger().Infof("Replication has been frozen already. Deleting left-over streams") + if err := mi.deleteTargetVReplication(ctx); err != nil { + mi.wr.Logger().Errorf("deleteTargetVReplication failed: %v", err) + return 0, err + } + return 0, nil + } + mi.wr.Logger().Infof("Built migration metadata: %+v", mi) if err := mi.validate(ctx, true /* isWrite */); err != nil { mi.wr.Logger().Errorf("validate failed: %v", err) @@ -157,34 +176,43 @@ func (wr *Wrangler) MigrateWrites(ctx context.Context, targetKeyspace, workflow defer targetUnlock(&err) } - journalsExist, err := mi.checkJournals(ctx) + // If no journals exist, sourceWorkflows will be initialized by sm.MigrateStreams. + journalsExist, sourceWorkflows, err := mi.checkJournals(ctx) if err != nil { mi.wr.Logger().Errorf("checkJournals failed: %v", err) return 0, err } if !journalsExist { mi.wr.Logger().Infof("No previous journals were found. Proceeding normally.") - sm := &streamMigrater{mi: mi} - tabletStreams, err := sm.stopStreams(ctx) + sm, err := buildStreamMigrater(ctx, mi) + if err != nil { + mi.wr.Logger().Errorf("buildStreamMigrater failed: %v", err) + return 0, err + } + sourceWorkflows, err = sm.stopStreams(ctx) if err != nil { mi.wr.Logger().Errorf("stopStreams failed: %v", err) - mi.cancelMigration(ctx) + mi.cancelMigration(ctx, sm) return 0, err } if err := mi.stopSourceWrites(ctx); err != nil { mi.wr.Logger().Errorf("stopSourceWrites failed: %v", err) - mi.cancelMigration(ctx) + mi.cancelMigration(ctx, sm) return 0, err } if err := mi.waitForCatchup(ctx, filteredReplicationWaitTime); err != nil { mi.wr.Logger().Errorf("waitForCatchup failed: %v", err) - mi.cancelMigration(ctx) + mi.cancelMigration(ctx, sm) return 0, err } - mi.sourceWorkflows, err = sm.migrateStreams(ctx, tabletStreams) - if err != nil { + if err := sm.migrateStreams(ctx); err != nil { mi.wr.Logger().Errorf("migrateStreams failed: %v", err) - mi.cancelMigration(ctx) + mi.cancelMigration(ctx, sm) + return 0, err + } + if err := mi.createReverseVReplication(ctx); err != nil { + mi.wr.Logger().Errorf("createReverseVReplication failed: %v", err) + mi.cancelMigration(ctx, sm) return 0, err } } else { @@ -197,14 +225,10 @@ func (wr *Wrangler) MigrateWrites(ctx context.Context, targetKeyspace, workflow } // This is the point of no return. Once a journal is created, // traffic can be redirected to target shards. - if err := mi.createJournals(ctx); err != nil { + if err := mi.createJournals(ctx, sourceWorkflows); err != nil { mi.wr.Logger().Errorf("createJournals failed: %v", err) return 0, err } - if err := mi.createReverseReplication(ctx); err != nil { - mi.wr.Logger().Errorf("createReverseReplication failed: %v", err) - return 0, err - } if err := mi.allowTargetWrites(ctx); err != nil { mi.wr.Logger().Errorf("allowTargetWrites failed: %v", err) return 0, err @@ -214,27 +238,45 @@ func (wr *Wrangler) MigrateWrites(ctx context.Context, targetKeyspace, workflow return 0, err } sm := &streamMigrater{mi: mi} - if err := sm.finalize(ctx, mi.sourceWorkflows); err != nil { + if err := sm.finalize(ctx, sourceWorkflows); err != nil { mi.wr.Logger().Errorf("finalize failed: %v", err) return 0, err } - mi.deleteTargetVReplication(ctx) + if reverseReplication { + if err := mi.startReverseVReplication(ctx); err != nil { + mi.wr.Logger().Errorf("startReverseVReplication failed: %v", err) + return 0, err + } + } + if err := mi.deleteTargetVReplication(ctx); err != nil { + mi.wr.Logger().Errorf("deleteTargetVReplication failed: %v", err) + return 0, err + } return mi.id, nil } func (wr *Wrangler) buildMigrater(ctx context.Context, targetKeyspace, workflow string) (*migrater, error) { - targets, err := wr.buildMigrationTargets(ctx, targetKeyspace, workflow) + targets, frozen, err := wr.buildMigrationTargets(ctx, targetKeyspace, workflow) if err != nil { return nil, err } + if frozen { + return &migrater{ + wr: wr, + workflow: workflow, + targets: targets, + frozen: true, + }, nil + } mi := &migrater{ - wr: wr, - workflow: workflow, - id: hashStreams(targetKeyspace, targets), - targets: targets, - sources: make(map[string]*miSource), - targetKeyspace: targetKeyspace, + wr: wr, + workflow: workflow, + reverseWorkflow: reverseName(workflow), + id: hashStreams(targetKeyspace, targets), + targets: targets, + sources: make(map[string]*miSource), + targetKeyspace: targetKeyspace, } mi.wr.Logger().Infof("Migration ID for workflow %s: %d", workflow, mi.id) @@ -305,11 +347,11 @@ func (wr *Wrangler) buildMigrater(ctx context.Context, targetKeyspace, workflow return mi, nil } -func (wr *Wrangler) buildMigrationTargets(ctx context.Context, targetKeyspace, workflow string) (targets map[string]*miTarget, err error) { +func (wr *Wrangler) buildMigrationTargets(ctx context.Context, targetKeyspace, workflow string) (targets map[string]*miTarget, frozen bool, err error) { targets = make(map[string]*miTarget) targetShards, err := wr.ts.GetShardNames(ctx, targetKeyspace) if err != nil { - return nil, err + return nil, false, err } // We check all target shards. All of them may not have a stream. // For example, if we're splitting -80 to -40,40-80, only those @@ -317,15 +359,15 @@ func (wr *Wrangler) buildMigrationTargets(ctx context.Context, targetKeyspace, w for _, targetShard := range targetShards { targetsi, err := wr.ts.GetShard(ctx, targetKeyspace, targetShard) if err != nil { - return nil, err + return nil, false, err } targetMaster, err := wr.ts.GetTablet(ctx, targetsi.MasterAlias) if err != nil { - return nil, err + return nil, false, err } - p3qr, err := wr.tmc.VReplicationExec(ctx, targetMaster.Tablet, fmt.Sprintf("select id, source from _vt.vreplication where workflow=%s and db_name=%s", encodeString(workflow), encodeString(targetMaster.DbName()))) + p3qr, err := wr.tmc.VReplicationExec(ctx, targetMaster.Tablet, fmt.Sprintf("select id, source, message from _vt.vreplication where workflow=%s and db_name=%s", encodeString(workflow), encodeString(targetMaster.DbName()))) if err != nil { - return nil, err + return nil, false, err } // If there's no vreplication stream, check the next target. if len(p3qr.Rows) < 1 { @@ -341,19 +383,24 @@ func (wr *Wrangler) buildMigrationTargets(ctx context.Context, targetKeyspace, w for _, row := range qr.Rows { id, err := sqltypes.ToInt64(row[0]) if err != nil { - return nil, err + return nil, false, err } + var bls binlogdatapb.BinlogSource if err := proto.UnmarshalText(row[1].ToString(), &bls); err != nil { - return nil, err + return nil, false, err } targets[targetShard].sources[uint32(id)] = &bls + + if row[2].ToString() == frozenStr { + frozen = true + } } } if len(targets) == 0 { - return nil, fmt.Errorf("no streams found in keyspace %s for: %s", targetKeyspace, workflow) + return nil, false, fmt.Errorf("no streams found in keyspace %s for: %s", targetKeyspace, workflow) } - return targets, nil + return targets, frozen, nil } // hashStreams produces a reproducible hash based on the input parameters. @@ -510,7 +557,9 @@ func (mi *migrater) migrateShardReads(ctx context.Context, cells []string, serve return mi.wr.ts.MigrateServedType(ctx, mi.sourceKeyspace, toShards, fromShards, servedType, cells) } -func (mi *migrater) checkJournals(ctx context.Context) (journalsExist bool, err error) { +// checkJournals returns true if at least one journal has been created. +// If so, it also returns the list of sourceWorkflows that need to be migrated. +func (mi *migrater) checkJournals(ctx context.Context) (journalsExist bool, sourceWorkflows []string, err error) { var mu sync.Mutex journal := &binlogdatapb.Journal{} var exists bool @@ -535,8 +584,7 @@ func (mi *migrater) checkJournals(ctx context.Context) (journalsExist bool, err } return nil }) - mi.sourceWorkflows = journal.SourceWorkflows - return exists, err + return exists, journal.SourceWorkflows, err } func (mi *migrater) stopSourceWrites(ctx context.Context) error { @@ -596,7 +644,7 @@ func (mi *migrater) waitForCatchup(ctx context.Context, filteredReplicationWaitT }) } -func (mi *migrater) cancelMigration(ctx context.Context) { +func (mi *migrater) cancelMigration(ctx context.Context, sm *streamMigrater) { var err error if mi.migrationType == binlogdatapb.MigrationType_TABLES { err = mi.changeTableSourceWrites(ctx, allowWrites) @@ -607,7 +655,6 @@ func (mi *migrater) cancelMigration(ctx context.Context) { mi.wr.Logger().Errorf("Cancel migration failed:", err) } - sm := &streamMigrater{mi: mi} sm.cancelMigration(ctx) err = mi.forAllTargets(func(target *miTarget) error { @@ -618,6 +665,11 @@ func (mi *migrater) cancelMigration(ctx context.Context) { if err != nil { mi.wr.Logger().Errorf("Cancel migration failed: could not restart vreplication: %v", err) } + + err = mi.deleteReverseVReplication(ctx) + if err != nil { + mi.wr.Logger().Errorf("Cancel migration failed: could not restart vreplication: %v", err) + } } func (mi *migrater) gatherPositions(ctx context.Context) error { @@ -638,7 +690,60 @@ func (mi *migrater) gatherPositions(ctx context.Context) error { }) } -func (mi *migrater) createJournals(ctx context.Context) error { +func (mi *migrater) createReverseVReplication(ctx context.Context) error { + if err := mi.deleteReverseVReplication(ctx); err != nil { + return err + } + err := mi.forAllUids(func(target *miTarget, uid uint32) error { + bls := target.sources[uid] + source := mi.sources[bls.Shard] + reverseBls := &binlogdatapb.BinlogSource{ + Keyspace: mi.targetKeyspace, + Shard: target.si.ShardName(), + TabletType: bls.TabletType, + Filter: &binlogdatapb.Filter{}, + OnDdl: bls.OnDdl, + } + for _, rule := range bls.Filter.Rules { + var filter string + if strings.HasPrefix(rule.Match, "/") { + if mi.sourceKSSchema.Keyspace.Sharded { + filter = key.KeyRangeString(source.si.KeyRange) + } + } else { + var inKeyrange string + if mi.sourceKSSchema.Keyspace.Sharded { + vtable, ok := mi.sourceKSSchema.Tables[rule.Match] + if !ok { + return fmt.Errorf("table %s not found in vschema", rule.Match) + } + // TODO(sougou): handle degenerate cases like sequence, etc. + // We currently assume the primary vindex is the best way to filter, which may not be true. + inKeyrange = fmt.Sprintf(" where in_keyrange(%s, '%s', '%s')", sqlparser.String(vtable.ColumnVindexes[0].Columns[0]), vtable.ColumnVindexes[0].Type, key.KeyRangeString(source.si.KeyRange)) + } + filter = fmt.Sprintf("select * from %s%s", rule.Match, inKeyrange) + } + reverseBls.Filter.Rules = append(reverseBls.Filter.Rules, &binlogdatapb.Rule{ + Match: rule.Match, + Filter: filter, + }) + } + + _, err := mi.wr.VReplicationExec(ctx, source.master.Alias, binlogplayer.CreateVReplicationState(mi.reverseWorkflow, reverseBls, target.position, binlogplayer.BlpStopped, source.master.DbName())) + return err + }) + return err +} + +func (mi *migrater) deleteReverseVReplication(ctx context.Context) error { + return mi.forAllSources(func(source *miSource) error { + query := fmt.Sprintf("delete from _vt.vreplication where db_name=%s and workflow=%s", encodeString(source.master.DbName()), encodeString(mi.reverseWorkflow)) + _, err := mi.wr.tmc.VReplicationExec(ctx, source.master.Tablet, query) + return err + }) +} + +func (mi *migrater) createJournals(ctx context.Context, sourceWorkflows []string) error { var participants []*binlogdatapb.KeyspaceShard for sourceShard := range mi.sources { participants = append(participants, &binlogdatapb.KeyspaceShard{ @@ -656,7 +761,7 @@ func (mi *migrater) createJournals(ctx context.Context) error { Tables: mi.tables, LocalPosition: source.position, Participants: participants, - SourceWorkflows: mi.sourceWorkflows, + SourceWorkflows: sourceWorkflows, } for targetShard, target := range mi.targets { found := false @@ -687,46 +792,6 @@ func (mi *migrater) createJournals(ctx context.Context) error { }) } -func (mi *migrater) createReverseReplication(ctx context.Context) error { - return mi.forAllUids(func(target *miTarget, uid uint32) error { - bls := target.sources[uid] - source := mi.sources[bls.Shard] - reverseBls := &binlogdatapb.BinlogSource{ - Keyspace: mi.targetKeyspace, - Shard: target.si.ShardName(), - TabletType: bls.TabletType, - Filter: &binlogdatapb.Filter{}, - } - for _, rule := range bls.Filter.Rules { - var filter string - if strings.HasPrefix(rule.Match, "/") { - if mi.sourceKSSchema.Keyspace.Sharded { - filter = bls.Shard - } - } else { - var inKeyrange string - if mi.sourceKSSchema.Keyspace.Sharded { - vtable, ok := mi.sourceKSSchema.Tables[rule.Match] - if !ok { - return fmt.Errorf("table %s not found in vschema", rule.Match) - } - // TODO(sougou): handle degenerate cases like sequence, etc. - // We currently assume the primary vindex is the best way to filter, which may not be true. - inKeyrange = fmt.Sprintf(" where in_keyrange(%s, '%s', '%s')", sqlparser.String(vtable.ColumnVindexes[0].Columns[0]), vtable.ColumnVindexes[0].Type, bls.Shard) - } - filter = fmt.Sprintf("select * from %s%s", rule.Match, inKeyrange) - } - reverseBls.Filter.Rules = append(reverseBls.Filter.Rules, &binlogdatapb.Rule{ - Match: rule.Match, - Filter: filter, - }) - } - - _, err := mi.wr.VReplicationExec(ctx, source.master.Alias, binlogplayer.CreateVReplicationState("ReversedResharding", reverseBls, target.position, binlogplayer.BlpStopped, source.master.DbName())) - return err - }) -} - func (mi *migrater) allowTargetWrites(ctx context.Context) error { if mi.migrationType == binlogdatapb.MigrationType_TABLES { return mi.allowTableTargetWrites(ctx) @@ -810,13 +875,30 @@ func (mi *migrater) changeShardRouting(ctx context.Context) error { return mi.wr.ts.MigrateServedType(ctx, mi.targetKeyspace, mi.targetShards(), mi.sourceShards(), topodatapb.TabletType_MASTER, nil) } -func (mi *migrater) deleteTargetVReplication(ctx context.Context) { - _ = mi.forAllTargets(func(target *miTarget) error { +func (mi *migrater) startReverseVReplication(ctx context.Context) error { + return mi.forAllSources(func(source *miSource) error { + query := fmt.Sprintf("update _vt.vreplication set state='Running', message='' where db_name=%s", encodeString(source.master.DbName())) + _, err := mi.wr.VReplicationExec(ctx, source.master.Alias, query) + return err + }) +} + +func (mi *migrater) deleteTargetVReplication(ctx context.Context) error { + // Mark target streams as frozen before deleting. If MigrateWrites gets + // re-invoked after a freeze, it will skip all the previous steps and + // jump directly here for the final cleanup. + err := mi.forAllTargets(func(target *miTarget) error { + query := fmt.Sprintf("update _vt.vreplication set message = '%s' where db_name=%s and workflow=%s", frozenStr, encodeString(target.master.DbName()), encodeString(mi.workflow)) + _, err := mi.wr.tmc.VReplicationExec(ctx, target.master.Tablet, query) + return err + }) + if err != nil { + return err + } + return mi.forAllTargets(func(target *miTarget) error { query := fmt.Sprintf("delete from _vt.vreplication where db_name=%s and workflow=%s", encodeString(target.master.DbName()), encodeString(mi.workflow)) - if _, err := mi.wr.tmc.VReplicationExec(ctx, target.master.Tablet, query); err != nil { - mi.wr.Logger().Errorf("Final cleanup: could not delete vreplication, please delete stopped streams manually: %v", err) - } - return nil + _, err := mi.wr.tmc.VReplicationExec(ctx, target.master.Tablet, query) + return err }) } @@ -918,3 +1000,11 @@ func (wr *Wrangler) saveRoutingRules(ctx context.Context, rules map[string][]str } return wr.ts.SaveRoutingRules(ctx, rrs) } + +func reverseName(workflow string) string { + const reverse = "_reverse" + if strings.HasSuffix(workflow, reverse) { + return workflow[:len(workflow)-len(reverse)] + } + return workflow + reverse +} diff --git a/go/vt/wrangler/migrater_env_test.go b/go/vt/wrangler/migrater_env_test.go index 6763927c671..4829f41b1d7 100644 --- a/go/vt/wrangler/migrater_env_test.go +++ b/go/vt/wrangler/migrater_env_test.go @@ -37,8 +37,8 @@ import ( "vitess.io/vitess/go/vt/vttablet/tmclient" ) -const vreplQueryks = "select id, source from _vt.vreplication where workflow='test' and db_name='vt_ks'" -const vreplQueryks2 = "select id, source from _vt.vreplication where workflow='test' and db_name='vt_ks2'" +const vreplQueryks = "select id, source, message from _vt.vreplication where workflow='test' and db_name='vt_ks'" +const vreplQueryks2 = "select id, source, message from _vt.vreplication where workflow='test' and db_name='vt_ks2'" type testMigraterEnv struct { ts *topo.Server @@ -138,11 +138,11 @@ func newTestTableMigrater(ctx context.Context, t *testing.T) *testMigraterEnv { }}, }, } - rows = append(rows, fmt.Sprintf("%d|%v", j+1, bls)) + rows = append(rows, fmt.Sprintf("%d|%v|", j+1, bls)) } tme.dbTargetClients[i].addInvariant(vreplQueryks2, sqltypes.MakeTestResult(sqltypes.MakeTestFields( - "id|source", - "int64|varchar"), + "id|source|message", + "int64|varchar|varchar"), rows...), ) } @@ -257,11 +257,11 @@ func newTestShardMigrater(ctx context.Context, t *testing.T, sourceShards, targe }}, }, } - rows = append(rows, fmt.Sprintf("%d|%v", j+1, bls)) + rows = append(rows, fmt.Sprintf("%d|%v|", j+1, bls)) } tme.dbTargetClients[i].addInvariant(vreplQueryks, sqltypes.MakeTestResult(sqltypes.MakeTestFields( - "id|source", - "int64|varchar"), + "id|source|message", + "int64|varchar|varchar"), rows...), ) } @@ -371,23 +371,48 @@ func (tme *testShardMigraterEnv) expectWaitForCatchup() { }) } -func (tme *testShardMigraterEnv) expectCreateJournals() { +func (tme *testShardMigraterEnv) expectDeleteReverseVReplication() { + // NOTE: this is not a faithful reproduction of what should happen. + // The ids returned are not accurate. for _, dbclient := range tme.dbSourceClients { - dbclient.addQueryRE("insert into _vt.resharding_journal.*", &sqltypes.Result{}, nil) + dbclient.addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow = 'test_reverse'", resultid12, nil) + dbclient.addQuery("delete from _vt.vreplication where id in (1, 2)", &sqltypes.Result{}, nil) + dbclient.addQuery("delete from _vt.copy_state where vrepl_id in (1, 2)", &sqltypes.Result{}, nil) } } -func (tme *testShardMigraterEnv) expectCreateReverseReplication() { +func (tme *testShardMigraterEnv) expectCreateReverseVReplication() { + tme.expectDeleteReverseVReplication() tme.forAllStreams(func(i, j int) { - tme.dbSourceClients[j].addQueryRE(fmt.Sprintf("insert into _vt.vreplication.*%s.*%s.*MariaDB/5-456-893.*Stopped", tme.targetShards[i], tme.sourceShards[j]), &sqltypes.Result{InsertID: uint64(j + 1)}, nil) + tme.dbSourceClients[j].addQueryRE(fmt.Sprintf("insert into _vt.vreplication.*%s.*%s.*MariaDB/5-456-893.*Stopped", tme.targetShards[i], key.KeyRangeString(tme.sourceKeyRanges[j])), &sqltypes.Result{InsertID: uint64(j + 1)}, nil) tme.dbSourceClients[j].addQuery(fmt.Sprintf("select * from _vt.vreplication where id = %d", j+1), stoppedResult(j+1), nil) }) } +func (tme *testShardMigraterEnv) expectCreateJournals() { + for _, dbclient := range tme.dbSourceClients { + dbclient.addQueryRE("insert into _vt.resharding_journal.*", &sqltypes.Result{}, nil) + } +} + +func (tme *testShardMigraterEnv) expectStartReverseVReplication() { + for _, dbclient := range tme.dbSourceClients { + dbclient.addQuery("select id from _vt.vreplication where db_name = 'vt_ks'", resultid34, nil) + dbclient.addQuery("update _vt.vreplication set state = 'Running', message = '' where id in (3, 4)", &sqltypes.Result{}, nil) + dbclient.addQuery("select * from _vt.vreplication where id = 3", runningResult(3), nil) + dbclient.addQuery("select * from _vt.vreplication where id = 4", runningResult(4), nil) + } +} + func (tme *testShardMigraterEnv) expectDeleteTargetVReplication() { // NOTE: this is not a faithful reproduction of what should happen. // The ids returned are not accurate. for _, dbclient := range tme.dbTargetClients { + dbclient.addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow = 'test'", resultid12, nil) + dbclient.addQuery("update _vt.vreplication set message = 'FROZEN' where id in (1, 2)", &sqltypes.Result{}, nil) + dbclient.addQuery("select * from _vt.vreplication where id = 1", stoppedResult(1), nil) + dbclient.addQuery("select * from _vt.vreplication where id = 2", stoppedResult(2), nil) + dbclient.addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow = 'test'", resultid12, nil) dbclient.addQuery("delete from _vt.vreplication where id in (1, 2)", &sqltypes.Result{}, nil) dbclient.addQuery("delete from _vt.copy_state where vrepl_id in (1, 2)", &sqltypes.Result{}, nil) @@ -398,4 +423,9 @@ func (tme *testShardMigraterEnv) expectCancelMigration() { for _, dbclient := range tme.dbTargetClients { dbclient.addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow = 'test'", &sqltypes.Result{}, nil) } + for _, dbclient := range tme.dbSourceClients { + dbclient.addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and workflow != 'test_reverse'", &sqltypes.Result{}, nil) + dbclient.addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow != 'test_reverse'", &sqltypes.Result{}, nil) + } + tme.expectDeleteReverseVReplication() } diff --git a/go/vt/wrangler/migrater_test.go b/go/vt/wrangler/migrater_test.go index a2a2ed2960a..29d2c30625b 100644 --- a/go/vt/wrangler/migrater_test.go +++ b/go/vt/wrangler/migrater_test.go @@ -234,7 +234,7 @@ func TestTableMigrateMainflow(t *testing.T) { //------------------------------------------------------------------------------------------------------------------- // Can't migrate writes if REPLICA and RDONLY have not fully migrated yet. - _, err = tme.wr.MigrateWrites(ctx, tme.targetKeyspace, "test", 1*time.Second) + _, err = tme.wr.MigrateWrites(ctx, tme.targetKeyspace, "test", 1*time.Second, true) want = "missing tablet type specific routing, read-only traffic must be migrated before migrating writes" if err == nil || !strings.Contains(err.Error(), want) { t.Errorf("MigrateWrites err: %v, want %v", err, want) @@ -278,6 +278,14 @@ func TestTableMigrateMainflow(t *testing.T) { } checkJournals() + deleteReverseReplicaion := func() { + tme.dbSourceClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks1' and workflow = 'test_reverse'", resultid34, nil) + tme.dbSourceClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks1' and workflow = 'test_reverse'", resultid34, nil) + tme.dbSourceClients[0].addQuery("delete from _vt.vreplication where id in (3, 4)", &sqltypes.Result{}, nil) + tme.dbSourceClients[1].addQuery("delete from _vt.vreplication where id in (3, 4)", &sqltypes.Result{}, nil) + tme.dbSourceClients[0].addQuery("delete from _vt.copy_state where vrepl_id in (3, 4)", &sqltypes.Result{}, nil) + tme.dbSourceClients[1].addQuery("delete from _vt.copy_state where vrepl_id in (3, 4)", &sqltypes.Result{}, nil) + } cancelMigration := func() { tme.dbTargetClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks2' and workflow = 'test'", resultid12, nil) tme.dbTargetClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks2' and workflow = 'test'", resultid12, nil) @@ -287,10 +295,12 @@ func TestTableMigrateMainflow(t *testing.T) { tme.dbTargetClients[0].addQuery("select * from _vt.vreplication where id = 2", runningResult(2), nil) tme.dbTargetClients[1].addQuery("select * from _vt.vreplication where id = 1", runningResult(1), nil) tme.dbTargetClients[1].addQuery("select * from _vt.vreplication where id = 2", runningResult(2), nil) + + deleteReverseReplicaion() } cancelMigration() - _, err = tme.wr.MigrateWrites(ctx, tme.targetKeyspace, "test", 0*time.Second) + _, err = tme.wr.MigrateWrites(ctx, tme.targetKeyspace, "test", 0*time.Second, true) want = "DeadlineExceeded" if err == nil || !strings.Contains(err.Error(), want) { t.Errorf("MigrateWrites(0 timeout) err: %v, must contain %v", err, want) @@ -352,6 +362,20 @@ func TestTableMigrateMainflow(t *testing.T) { } waitForCatchup() + createReverseVReplication := func() { + deleteReverseReplicaion() + + tme.dbSourceClients[0].addQueryRE("insert into _vt.vreplication.*test_reverse.*ks2.*-80.*t1.*in_keyrange.*c1.*hash.*-40.*t2.*-40.*MariaDB/5-456-893.*Stopped", &sqltypes.Result{InsertID: 1}, nil) + tme.dbSourceClients[0].addQueryRE("insert into _vt.vreplication.*test_reverse.*ks2.*80-.*t1.*in_keyrange.*c1.*hash.*-40.*t2.*-40.*MariaDB/5-456-893.*Stopped", &sqltypes.Result{InsertID: 2}, nil) + tme.dbSourceClients[1].addQueryRE("insert into _vt.vreplication.*test_reverse.*ks2.*-80.*t1.*in_keyrange.*c1.*hash.*40-.*t2.*40-.*MariaDB/5-456-893.*Stopped", &sqltypes.Result{InsertID: 1}, nil) + tme.dbSourceClients[1].addQueryRE("insert into _vt.vreplication.*test_reverse.*ks2.*80-.*t1.*in_keyrange.*c1.*hash.*40-.*t2.*40-.*MariaDB/5-456-893.*Stopped", &sqltypes.Result{InsertID: 2}, nil) + tme.dbSourceClients[0].addQuery("select * from _vt.vreplication where id = 1", stoppedResult(1), nil) + tme.dbSourceClients[0].addQuery("select * from _vt.vreplication where id = 2", stoppedResult(2), nil) + tme.dbSourceClients[1].addQuery("select * from _vt.vreplication where id = 1", stoppedResult(1), nil) + tme.dbSourceClients[1].addQuery("select * from _vt.vreplication where id = 2", stoppedResult(2), nil) + } + createReverseVReplication() + createJournals := func() { journal1 := "insert into _vt.resharding_journal.*7672494164556733923,.*tables.*t1.*t2.*local_position.*MariaDB/5-456-892.*shard_gtids.*-80.*MariaDB/5-456-893.*participants.*40.*40" tme.dbSourceClients[0].addQueryRE(journal1, &sqltypes.Result{}, nil) @@ -360,19 +384,28 @@ func TestTableMigrateMainflow(t *testing.T) { } createJournals() - createReverseReplication := func() { - tme.dbSourceClients[0].addQueryRE("insert into _vt.vreplication.*ks2.*-80.*t1.*in_keyrange.*c1.*hash.*-40.*t2.*-40.*MariaDB/5-456-893.*Stopped", &sqltypes.Result{InsertID: 1}, nil) - tme.dbSourceClients[0].addQueryRE("insert into _vt.vreplication.*ks2.*80-.*t1.*in_keyrange.*c1.*hash.*-40.*t2.*-40.*MariaDB/5-456-893.*Stopped", &sqltypes.Result{InsertID: 2}, nil) - tme.dbSourceClients[1].addQueryRE("insert into _vt.vreplication.*ks2.*-80.*t1.*in_keyrange.*c1.*hash.*40-.*t2.*40-.*MariaDB/5-456-893.*Stopped", &sqltypes.Result{InsertID: 1}, nil) - tme.dbSourceClients[1].addQueryRE("insert into _vt.vreplication.*ks2.*80-.*t1.*in_keyrange.*c1.*hash.*40-.*t2.*40-.*MariaDB/5-456-893.*Stopped", &sqltypes.Result{InsertID: 2}, nil) - tme.dbSourceClients[0].addQuery("select * from _vt.vreplication where id = 1", stoppedResult(1), nil) - tme.dbSourceClients[0].addQuery("select * from _vt.vreplication where id = 2", stoppedResult(2), nil) - tme.dbSourceClients[1].addQuery("select * from _vt.vreplication where id = 1", stoppedResult(1), nil) - tme.dbSourceClients[1].addQuery("select * from _vt.vreplication where id = 2", stoppedResult(2), nil) + startReverseVReplication := func() { + tme.dbSourceClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks1'", resultid34, nil) + tme.dbSourceClients[0].addQuery("update _vt.vreplication set state = 'Running', message = '' where id in (3, 4)", &sqltypes.Result{}, nil) + tme.dbSourceClients[0].addQuery("select * from _vt.vreplication where id = 3", runningResult(3), nil) + tme.dbSourceClients[0].addQuery("select * from _vt.vreplication where id = 4", runningResult(4), nil) + tme.dbSourceClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks1'", resultid34, nil) + tme.dbSourceClients[1].addQuery("update _vt.vreplication set state = 'Running', message = '' where id in (3, 4)", &sqltypes.Result{}, nil) + tme.dbSourceClients[1].addQuery("select * from _vt.vreplication where id = 3", runningResult(3), nil) + tme.dbSourceClients[1].addQuery("select * from _vt.vreplication where id = 4", runningResult(4), nil) } - createReverseReplication() + startReverseVReplication() deleteTargetVReplication := func() { + tme.dbTargetClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks2' and workflow = 'test'", resultid12, nil) + tme.dbTargetClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks2' and workflow = 'test'", resultid12, nil) + tme.dbTargetClients[0].addQuery("update _vt.vreplication set message = 'FROZEN' where id in (1, 2)", &sqltypes.Result{}, nil) + tme.dbTargetClients[0].addQuery("select * from _vt.vreplication where id = 1", stoppedResult(1), nil) + tme.dbTargetClients[0].addQuery("select * from _vt.vreplication where id = 2", stoppedResult(2), nil) + tme.dbTargetClients[1].addQuery("update _vt.vreplication set message = 'FROZEN' where id in (1, 2)", &sqltypes.Result{}, nil) + tme.dbTargetClients[1].addQuery("select * from _vt.vreplication where id = 1", stoppedResult(1), nil) + tme.dbTargetClients[1].addQuery("select * from _vt.vreplication where id = 2", stoppedResult(2), nil) + tme.dbTargetClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks2' and workflow = 'test'", resultid12, nil) tme.dbTargetClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks2' and workflow = 'test'", resultid12, nil) tme.dbTargetClients[0].addQuery("delete from _vt.vreplication where id in (1, 2)", &sqltypes.Result{}, nil) @@ -382,7 +415,7 @@ func TestTableMigrateMainflow(t *testing.T) { } deleteTargetVReplication() - journalID, err := tme.wr.MigrateWrites(ctx, tme.targetKeyspace, "test", 1*time.Second) + journalID, err := tme.wr.MigrateWrites(ctx, tme.targetKeyspace, "test", 1*time.Second, true) if err != nil { t.Fatal(err) } @@ -515,7 +548,7 @@ func TestShardMigrateMainflow(t *testing.T) { //------------------------------------------------------------------------------------------------------------------- // Can't migrate writes if REPLICA and RDONLY have not fully migrated yet. - _, err = tme.wr.MigrateWrites(ctx, tme.targetKeyspace, "test", 1*time.Second) + _, err = tme.wr.MigrateWrites(ctx, tme.targetKeyspace, "test", 1*time.Second, true) want = "cannot migrate MASTER away" if err == nil || !strings.Contains(err.Error(), want) { t.Errorf("MigrateWrites err: %v, want %v", err, want) @@ -546,22 +579,38 @@ func TestShardMigrateMainflow(t *testing.T) { checkJournals() stopStreams := func() { - tme.dbSourceClients[0].addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and state = 'Stopped'", &sqltypes.Result{}, nil) - tme.dbSourceClients[1].addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and state = 'Stopped'", &sqltypes.Result{}, nil) - tme.dbSourceClients[0].addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks'", &sqltypes.Result{}, nil) - tme.dbSourceClients[1].addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks'", &sqltypes.Result{}, nil) + tme.dbSourceClients[0].addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and workflow != 'test_reverse' and state = 'Stopped'", &sqltypes.Result{}, nil) + tme.dbSourceClients[1].addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and workflow != 'test_reverse' and state = 'Stopped'", &sqltypes.Result{}, nil) + tme.dbSourceClients[0].addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and workflow != 'test_reverse'", &sqltypes.Result{}, nil) + tme.dbSourceClients[1].addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and workflow != 'test_reverse'", &sqltypes.Result{}, nil) } stopStreams() + deleteReverseReplicaion := func() { + tme.dbSourceClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow = 'test_reverse'", resultid3, nil) + tme.dbSourceClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow = 'test_reverse'", resultid34, nil) + tme.dbSourceClients[0].addQuery("delete from _vt.vreplication where id in (3)", &sqltypes.Result{}, nil) + tme.dbSourceClients[1].addQuery("delete from _vt.vreplication where id in (3, 4)", &sqltypes.Result{}, nil) + tme.dbSourceClients[0].addQuery("delete from _vt.copy_state where vrepl_id in (3)", &sqltypes.Result{}, nil) + tme.dbSourceClients[1].addQuery("delete from _vt.copy_state where vrepl_id in (3, 4)", &sqltypes.Result{}, nil) + } cancelMigration := func() { + tme.dbSourceClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow != 'test_reverse'", &sqltypes.Result{}, nil) + tme.dbSourceClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow != 'test_reverse'", &sqltypes.Result{}, nil) + tme.dbTargetClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow = 'test'", resultid12, nil) tme.dbTargetClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow = 'test'", resultid2, nil) tme.dbTargetClients[0].addQuery("update _vt.vreplication set state = 'Running', message = '' where id in (1, 2)", &sqltypes.Result{}, nil) tme.dbTargetClients[1].addQuery("update _vt.vreplication set state = 'Running', message = '' where id in (2)", &sqltypes.Result{}, nil) + tme.dbTargetClients[0].addQuery("select * from _vt.vreplication where id = 1", runningResult(1), nil) + tme.dbTargetClients[0].addQuery("select * from _vt.vreplication where id = 2", runningResult(2), nil) + tme.dbTargetClients[1].addQuery("select * from _vt.vreplication where id = 2", runningResult(2), nil) + + deleteReverseReplicaion() } cancelMigration() - _, err = tme.wr.MigrateWrites(ctx, tme.targetKeyspace, "test", 0*time.Second) + _, err = tme.wr.MigrateWrites(ctx, tme.targetKeyspace, "test", 0*time.Second, true) want = "DeadlineExceeded" if err == nil || !strings.Contains(err.Error(), want) { t.Errorf("MigrateWrites(0 timeout) err: %v, must contain %v", err, want) @@ -608,6 +657,18 @@ func TestShardMigrateMainflow(t *testing.T) { } waitForCatchup() + createReverseVReplication := func() { + deleteReverseReplicaion() + + tme.dbSourceClients[0].addQueryRE("insert into _vt.vreplication.*-80.*-40.*MariaDB/5-456-893.*Stopped", &sqltypes.Result{InsertID: 1}, nil) + tme.dbSourceClients[1].addQueryRE("insert into _vt.vreplication.*-80.*40-.*MariaDB/5-456-893.*Stopped", &sqltypes.Result{InsertID: 1}, nil) + tme.dbSourceClients[1].addQueryRE("insert into _vt.vreplication.*80-.*40-.*MariaDB/5-456-893.*Stopped", &sqltypes.Result{InsertID: 2}, nil) + tme.dbSourceClients[0].addQuery("select * from _vt.vreplication where id = 1", stoppedResult(1), nil) + tme.dbSourceClients[1].addQuery("select * from _vt.vreplication where id = 1", stoppedResult(1), nil) + tme.dbSourceClients[1].addQuery("select * from _vt.vreplication where id = 2", stoppedResult(2), nil) + } + createReverseVReplication() + createJournals := func() { journal1 := "insert into _vt.resharding_journal.*6432976123657117097.*migration_type:SHARDS.*local_position.*MariaDB/5-456-892.*shard_gtids.*-80.*MariaDB/5-456-893.*participants.*40.*40" tme.dbSourceClients[0].addQueryRE(journal1, &sqltypes.Result{}, nil) @@ -616,17 +677,27 @@ func TestShardMigrateMainflow(t *testing.T) { } createJournals() - createReverseReplication := func() { - tme.dbSourceClients[0].addQueryRE("insert into _vt.vreplication.*-80.*-40.*MariaDB/5-456-893.*Stopped", &sqltypes.Result{InsertID: 1}, nil) - tme.dbSourceClients[1].addQueryRE("insert into _vt.vreplication.*-80.*40-.*MariaDB/5-456-893.*Stopped", &sqltypes.Result{InsertID: 1}, nil) - tme.dbSourceClients[1].addQueryRE("insert into _vt.vreplication.*80-.*40-.*MariaDB/5-456-893.*Stopped", &sqltypes.Result{InsertID: 2}, nil) - tme.dbSourceClients[0].addQuery("select * from _vt.vreplication where id = 1", stoppedResult(1), nil) - tme.dbSourceClients[1].addQuery("select * from _vt.vreplication where id = 1", stoppedResult(1), nil) - tme.dbSourceClients[1].addQuery("select * from _vt.vreplication where id = 2", stoppedResult(2), nil) + startReverseVReplication := func() { + tme.dbSourceClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks'", resultid34, nil) + tme.dbSourceClients[0].addQuery("update _vt.vreplication set state = 'Running', message = '' where id in (3, 4)", &sqltypes.Result{}, nil) + tme.dbSourceClients[0].addQuery("select * from _vt.vreplication where id = 3", runningResult(3), nil) + tme.dbSourceClients[0].addQuery("select * from _vt.vreplication where id = 4", runningResult(4), nil) + tme.dbSourceClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks'", resultid34, nil) + tme.dbSourceClients[1].addQuery("update _vt.vreplication set state = 'Running', message = '' where id in (3, 4)", &sqltypes.Result{}, nil) + tme.dbSourceClients[1].addQuery("select * from _vt.vreplication where id = 3", runningResult(3), nil) + tme.dbSourceClients[1].addQuery("select * from _vt.vreplication where id = 4", runningResult(4), nil) } - createReverseReplication() + startReverseVReplication() deleteTargetVReplication := func() { + tme.dbTargetClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow = 'test'", resultid12, nil) + tme.dbTargetClients[0].addQuery("update _vt.vreplication set message = 'FROZEN' where id in (1, 2)", &sqltypes.Result{}, nil) + tme.dbTargetClients[0].addQuery("select * from _vt.vreplication where id = 1", stoppedResult(1), nil) + tme.dbTargetClients[0].addQuery("select * from _vt.vreplication where id = 2", stoppedResult(2), nil) + tme.dbTargetClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow = 'test'", resultid2, nil) + tme.dbTargetClients[1].addQuery("update _vt.vreplication set message = 'FROZEN' where id in (2)", &sqltypes.Result{}, nil) + tme.dbTargetClients[1].addQuery("select * from _vt.vreplication where id = 2", stoppedResult(2), nil) + tme.dbTargetClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow = 'test'", resultid12, nil) tme.dbTargetClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow = 'test'", resultid2, nil) tme.dbTargetClients[0].addQuery("delete from _vt.vreplication where id in (1, 2)", &sqltypes.Result{}, nil) @@ -636,7 +707,7 @@ func TestShardMigrateMainflow(t *testing.T) { } deleteTargetVReplication() - journalID, err := tme.wr.MigrateWrites(ctx, tme.targetKeyspace, "test", 1*time.Second) + journalID, err := tme.wr.MigrateWrites(ctx, tme.targetKeyspace, "test", 1*time.Second, true) if err != nil { t.Fatal(err) } @@ -710,11 +781,34 @@ func TestMigrateFailJournal(t *testing.T) { tme.dbTargetClients[1].addQuery(cancel1, &sqltypes.Result{}, nil) tme.dbTargetClients[1].addQuery(cancel2, &sqltypes.Result{}, nil) + deleteReverseReplicaion := func() { + tme.dbSourceClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks1' and workflow = 'test_reverse'", resultid34, nil) + tme.dbSourceClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks1' and workflow = 'test_reverse'", resultid34, nil) + tme.dbSourceClients[0].addQuery("delete from _vt.vreplication where id in (3, 4)", &sqltypes.Result{}, nil) + tme.dbSourceClients[1].addQuery("delete from _vt.vreplication where id in (3, 4)", &sqltypes.Result{}, nil) + tme.dbSourceClients[0].addQuery("delete from _vt.copy_state where vrepl_id in (3, 4)", &sqltypes.Result{}, nil) + tme.dbSourceClients[1].addQuery("delete from _vt.copy_state where vrepl_id in (3, 4)", &sqltypes.Result{}, nil) + } + + createReverseVReplication := func() { + deleteReverseReplicaion() + + tme.dbSourceClients[0].addQueryRE("insert into _vt.vreplication.*test_reverse.*ks2.*-80.*t1.*in_keyrange.*c1.*hash.*-40.*t2.*-40.*MariaDB/5-456-893.*Stopped", &sqltypes.Result{InsertID: 1}, nil) + tme.dbSourceClients[0].addQueryRE("insert into _vt.vreplication.*test_reverse.*ks2.*80-.*t1.*in_keyrange.*c1.*hash.*-40.*t2.*-40.*MariaDB/5-456-893.*Stopped", &sqltypes.Result{InsertID: 2}, nil) + tme.dbSourceClients[1].addQueryRE("insert into _vt.vreplication.*test_reverse.*ks2.*-80.*t1.*in_keyrange.*c1.*hash.*40-.*t2.*40-.*MariaDB/5-456-893.*Stopped", &sqltypes.Result{InsertID: 1}, nil) + tme.dbSourceClients[1].addQueryRE("insert into _vt.vreplication.*test_reverse.*ks2.*80-.*t1.*in_keyrange.*c1.*hash.*40-.*t2.*40-.*MariaDB/5-456-893.*Stopped", &sqltypes.Result{InsertID: 2}, nil) + tme.dbSourceClients[0].addQuery("select * from _vt.vreplication where id = 1", stoppedResult(1), nil) + tme.dbSourceClients[0].addQuery("select * from _vt.vreplication where id = 2", stoppedResult(2), nil) + tme.dbSourceClients[1].addQuery("select * from _vt.vreplication where id = 1", stoppedResult(1), nil) + tme.dbSourceClients[1].addQuery("select * from _vt.vreplication where id = 2", stoppedResult(2), nil) + } + createReverseVReplication() + // Make the journal call fail. tme.dbSourceClients[0].addQueryRE("insert into _vt.resharding_journal", nil, errors.New("journaling intentionally failed")) tme.dbSourceClients[1].addQueryRE("insert into _vt.resharding_journal", nil, errors.New("journaling intentionally failed")) - _, err = tme.wr.MigrateWrites(ctx, tme.targetKeyspace, "test", 1*time.Second) + _, err = tme.wr.MigrateWrites(ctx, tme.targetKeyspace, "test", 1*time.Second, true) want := "journaling intentionally failed" if err == nil || !strings.Contains(err.Error(), want) { t.Errorf("MigrateWrites(0 timeout) err: %v, must contain %v", err, want) @@ -750,21 +844,30 @@ func TestTableMigrateJournalExists(t *testing.T) { tme.dbSourceClients[0].addQuery("select val from _vt.resharding_journal where id=7672494164556733923", sqltypes.MakeTestResult(sqltypes.MakeTestFields("val", "varbinary"), ""), nil) tme.dbSourceClients[1].addQuery("select val from _vt.resharding_journal where id=7672494164556733923", &sqltypes.Result{}, nil) - // mi.creaetJournals: Create the missing journal. + // mi.createJournals: Create the missing journal. journal2 := "insert into _vt.resharding_journal.*7672494164556733923,.*tables.*t1.*t2.*local_position.*MariaDB/5-456-892.*shard_gtids.*80.*MariaDB/5-456-893.*80.*participants.*40.*40" tme.dbSourceClients[1].addQueryRE(journal2, &sqltypes.Result{}, nil) - // mi.createReverseReplication - tme.dbSourceClients[0].addQueryRE("insert into _vt.vreplication.*ks2.*-80.*t1.*in_keyrange.*c1.*hash.*-40.*t2.*-40.*MariaDB/5-456-893.*Stopped", &sqltypes.Result{InsertID: 1}, nil) - tme.dbSourceClients[0].addQueryRE("insert into _vt.vreplication.*ks2.*80-.*t1.*in_keyrange.*c1.*hash.*-40.*t2.*-40.*MariaDB/5-456-893.*Stopped", &sqltypes.Result{InsertID: 2}, nil) - tme.dbSourceClients[1].addQueryRE("insert into _vt.vreplication.*ks2.*-80.*t1.*in_keyrange.*c1.*hash.*40-.*t2.*40-.*MariaDB/5-456-893.*Stopped", &sqltypes.Result{InsertID: 1}, nil) - tme.dbSourceClients[1].addQueryRE("insert into _vt.vreplication.*ks2.*80-.*t1.*in_keyrange.*c1.*hash.*40-.*t2.*40-.*MariaDB/5-456-893.*Stopped", &sqltypes.Result{InsertID: 2}, nil) - tme.dbSourceClients[0].addQuery("select * from _vt.vreplication where id = 1", stoppedResult(1), nil) - tme.dbSourceClients[0].addQuery("select * from _vt.vreplication where id = 2", stoppedResult(2), nil) - tme.dbSourceClients[1].addQuery("select * from _vt.vreplication where id = 1", stoppedResult(1), nil) - tme.dbSourceClients[1].addQuery("select * from _vt.vreplication where id = 2", stoppedResult(2), nil) + // mi.startReverseVReplication + tme.dbSourceClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks1'", resultid34, nil) + tme.dbSourceClients[0].addQuery("update _vt.vreplication set state = 'Running', message = '' where id in (3, 4)", &sqltypes.Result{}, nil) + tme.dbSourceClients[0].addQuery("select * from _vt.vreplication where id = 3", runningResult(3), nil) + tme.dbSourceClients[0].addQuery("select * from _vt.vreplication where id = 4", runningResult(4), nil) + tme.dbSourceClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks1'", resultid34, nil) + tme.dbSourceClients[1].addQuery("update _vt.vreplication set state = 'Running', message = '' where id in (3, 4)", &sqltypes.Result{}, nil) + tme.dbSourceClients[1].addQuery("select * from _vt.vreplication where id = 3", runningResult(3), nil) + tme.dbSourceClients[1].addQuery("select * from _vt.vreplication where id = 4", runningResult(4), nil) // mi.deleteTargetVReplication + tme.dbTargetClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks2' and workflow = 'test'", resultid12, nil) + tme.dbTargetClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks2' and workflow = 'test'", resultid12, nil) + tme.dbTargetClients[0].addQuery("update _vt.vreplication set message = 'FROZEN' where id in (1, 2)", &sqltypes.Result{}, nil) + tme.dbTargetClients[0].addQuery("select * from _vt.vreplication where id = 1", stoppedResult(1), nil) + tme.dbTargetClients[0].addQuery("select * from _vt.vreplication where id = 2", stoppedResult(2), nil) + tme.dbTargetClients[1].addQuery("update _vt.vreplication set message = 'FROZEN' where id in (1, 2)", &sqltypes.Result{}, nil) + tme.dbTargetClients[1].addQuery("select * from _vt.vreplication where id = 1", stoppedResult(1), nil) + tme.dbTargetClients[1].addQuery("select * from _vt.vreplication where id = 2", stoppedResult(2), nil) + tme.dbTargetClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks2' and workflow = 'test'", resultid12, nil) tme.dbTargetClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks2' and workflow = 'test'", resultid12, nil) tme.dbTargetClients[0].addQuery("delete from _vt.vreplication where id in (1, 2)", &sqltypes.Result{}, nil) @@ -772,7 +875,7 @@ func TestTableMigrateJournalExists(t *testing.T) { tme.dbTargetClients[1].addQuery("delete from _vt.vreplication where id in (1, 2)", &sqltypes.Result{}, nil) tme.dbTargetClients[1].addQuery("delete from _vt.copy_state where vrepl_id in (1, 2)", &sqltypes.Result{}, nil) - _, err = tme.wr.MigrateWrites(ctx, tme.targetKeyspace, "test", 1*time.Second) + _, err = tme.wr.MigrateWrites(ctx, tme.targetKeyspace, "test", 1*time.Second, true) if err != nil { t.Fatal(err) } @@ -816,15 +919,25 @@ func TestShardMigrateJournalExists(t *testing.T) { journal2 := "insert into _vt.resharding_journal.*6432976123657117097.*migration_type:SHARDS.*local_position.*MariaDB/5-456-892.*shard_gtids.*80.*MariaDB/5-456-893.*shard_gtids.*80.*MariaDB/5-456-893.*participants.*40.*40" tme.dbSourceClients[1].addQueryRE(journal2, &sqltypes.Result{}, nil) - // mi.createReverseReplication - tme.dbSourceClients[0].addQueryRE("insert into _vt.vreplication.*-80.*-40.*MariaDB/5-456-893.*Stopped", &sqltypes.Result{InsertID: 1}, nil) - tme.dbSourceClients[1].addQueryRE("insert into _vt.vreplication.*-80.*40-.*MariaDB/5-456-893.*Stopped", &sqltypes.Result{InsertID: 1}, nil) - tme.dbSourceClients[1].addQueryRE("insert into _vt.vreplication.*80-.*40-.*MariaDB/5-456-893.*Stopped", &sqltypes.Result{InsertID: 2}, nil) - tme.dbSourceClients[0].addQuery("select * from _vt.vreplication where id = 1", stoppedResult(1), nil) - tme.dbSourceClients[1].addQuery("select * from _vt.vreplication where id = 1", stoppedResult(1), nil) - tme.dbSourceClients[1].addQuery("select * from _vt.vreplication where id = 2", stoppedResult(2), nil) + // mi.startReverseVReplication + tme.dbSourceClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks'", resultid34, nil) + tme.dbSourceClients[0].addQuery("update _vt.vreplication set state = 'Running', message = '' where id in (3, 4)", &sqltypes.Result{}, nil) + tme.dbSourceClients[0].addQuery("select * from _vt.vreplication where id = 3", runningResult(3), nil) + tme.dbSourceClients[0].addQuery("select * from _vt.vreplication where id = 4", runningResult(4), nil) + tme.dbSourceClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks'", resultid34, nil) + tme.dbSourceClients[1].addQuery("update _vt.vreplication set state = 'Running', message = '' where id in (3, 4)", &sqltypes.Result{}, nil) + tme.dbSourceClients[1].addQuery("select * from _vt.vreplication where id = 3", runningResult(3), nil) + tme.dbSourceClients[1].addQuery("select * from _vt.vreplication where id = 4", runningResult(4), nil) // mi.deleteTargetVReplication + tme.dbTargetClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow = 'test'", resultid12, nil) + tme.dbTargetClients[0].addQuery("update _vt.vreplication set message = 'FROZEN' where id in (1, 2)", &sqltypes.Result{}, nil) + tme.dbTargetClients[0].addQuery("select * from _vt.vreplication where id = 1", stoppedResult(1), nil) + tme.dbTargetClients[0].addQuery("select * from _vt.vreplication where id = 2", stoppedResult(2), nil) + tme.dbTargetClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow = 'test'", resultid2, nil) + tme.dbTargetClients[1].addQuery("update _vt.vreplication set message = 'FROZEN' where id in (2)", &sqltypes.Result{}, nil) + tme.dbTargetClients[1].addQuery("select * from _vt.vreplication where id = 2", stoppedResult(2), nil) + tme.dbTargetClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow = 'test'", resultid12, nil) tme.dbTargetClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow = 'test'", resultid2, nil) tme.dbTargetClients[0].addQuery("delete from _vt.vreplication where id in (1, 2)", &sqltypes.Result{}, nil) @@ -832,7 +945,7 @@ func TestShardMigrateJournalExists(t *testing.T) { tme.dbTargetClients[1].addQuery("delete from _vt.vreplication where id in (2)", &sqltypes.Result{}, nil) tme.dbTargetClients[1].addQuery("delete from _vt.copy_state where vrepl_id in (2)", &sqltypes.Result{}, nil) - _, err = tme.wr.MigrateWrites(ctx, tme.targetKeyspace, "test", 1*time.Second) + _, err = tme.wr.MigrateWrites(ctx, tme.targetKeyspace, "test", 1*time.Second, true) if err != nil { t.Fatal(err) } @@ -850,6 +963,174 @@ func TestShardMigrateJournalExists(t *testing.T) { verifyQueries(t, tme.allDBClients) } +func TestTableMigrateNoReverse(t *testing.T) { + ctx := context.Background() + tme := newTestTableMigrater(ctx, t) + defer tme.stopTablets(t) + + err := tme.wr.MigrateReads(ctx, tme.targetKeyspace, "test", topodatapb.TabletType_RDONLY, nil, DirectionForward) + if err != nil { + t.Fatal(err) + } + err = tme.wr.MigrateReads(ctx, tme.targetKeyspace, "test", topodatapb.TabletType_REPLICA, nil, DirectionForward) + if err != nil { + t.Fatal(err) + } + + checkJournals := func() { + tme.dbSourceClients[0].addQuery("select val from _vt.resharding_journal where id=7672494164556733923", &sqltypes.Result{}, nil) + tme.dbSourceClients[1].addQuery("select val from _vt.resharding_journal where id=7672494164556733923", &sqltypes.Result{}, nil) + } + checkJournals() + + waitForCatchup := func() { + // mi.waitForCatchup-> mi.wr.tmc.VReplicationWaitForPos + state := sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "pos|state|message", + "varchar|varchar|varchar"), + "MariaDB/5-456-892|Running", + ) + tme.dbTargetClients[0].addQuery("select pos, state, message from _vt.vreplication where id=1", state, nil) + tme.dbTargetClients[0].addQuery("select pos, state, message from _vt.vreplication where id=2", state, nil) + tme.dbTargetClients[1].addQuery("select pos, state, message from _vt.vreplication where id=1", state, nil) + tme.dbTargetClients[1].addQuery("select pos, state, message from _vt.vreplication where id=2", state, nil) + + // mi.waitForCatchup-> mi.wr.tmc.VReplicationExec('Stopped') + tme.dbTargetClients[0].addQuery("select id from _vt.vreplication where id = 1", resultid1, nil) + tme.dbTargetClients[0].addQuery("update _vt.vreplication set state = 'Stopped', message = 'stopped for cutover' where id in (1)", &sqltypes.Result{}, nil) + tme.dbTargetClients[0].addQuery("select id from _vt.vreplication where id = 2", resultid2, nil) + tme.dbTargetClients[0].addQuery("update _vt.vreplication set state = 'Stopped', message = 'stopped for cutover' where id in (2)", &sqltypes.Result{}, nil) + tme.dbTargetClients[1].addQuery("select id from _vt.vreplication where id = 1", resultid1, nil) + tme.dbTargetClients[1].addQuery("update _vt.vreplication set state = 'Stopped', message = 'stopped for cutover' where id in (1)", &sqltypes.Result{}, nil) + tme.dbTargetClients[1].addQuery("select id from _vt.vreplication where id = 2", resultid2, nil) + tme.dbTargetClients[1].addQuery("update _vt.vreplication set state = 'Stopped', message = 'stopped for cutover' where id in (2)", &sqltypes.Result{}, nil) + tme.dbTargetClients[0].addQuery("select * from _vt.vreplication where id = 1", stoppedResult(1), nil) + tme.dbTargetClients[0].addQuery("select * from _vt.vreplication where id = 2", stoppedResult(2), nil) + tme.dbTargetClients[1].addQuery("select * from _vt.vreplication where id = 1", stoppedResult(1), nil) + tme.dbTargetClients[1].addQuery("select * from _vt.vreplication where id = 2", stoppedResult(2), nil) + } + waitForCatchup() + + deleteReverseReplicaion := func() { + tme.dbSourceClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks1' and workflow = 'test_reverse'", resultid34, nil) + tme.dbSourceClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks1' and workflow = 'test_reverse'", resultid34, nil) + tme.dbSourceClients[0].addQuery("delete from _vt.vreplication where id in (3, 4)", &sqltypes.Result{}, nil) + tme.dbSourceClients[1].addQuery("delete from _vt.vreplication where id in (3, 4)", &sqltypes.Result{}, nil) + tme.dbSourceClients[0].addQuery("delete from _vt.copy_state where vrepl_id in (3, 4)", &sqltypes.Result{}, nil) + tme.dbSourceClients[1].addQuery("delete from _vt.copy_state where vrepl_id in (3, 4)", &sqltypes.Result{}, nil) + } + + createReverseVReplication := func() { + deleteReverseReplicaion() + + tme.dbSourceClients[0].addQueryRE("insert into _vt.vreplication.*test_reverse.*ks2.*-80.*t1.*in_keyrange.*c1.*hash.*-40.*t2.*-40.*MariaDB/5-456-893.*Stopped", &sqltypes.Result{InsertID: 1}, nil) + tme.dbSourceClients[0].addQueryRE("insert into _vt.vreplication.*test_reverse.*ks2.*80-.*t1.*in_keyrange.*c1.*hash.*-40.*t2.*-40.*MariaDB/5-456-893.*Stopped", &sqltypes.Result{InsertID: 2}, nil) + tme.dbSourceClients[1].addQueryRE("insert into _vt.vreplication.*test_reverse.*ks2.*-80.*t1.*in_keyrange.*c1.*hash.*40-.*t2.*40-.*MariaDB/5-456-893.*Stopped", &sqltypes.Result{InsertID: 1}, nil) + tme.dbSourceClients[1].addQueryRE("insert into _vt.vreplication.*test_reverse.*ks2.*80-.*t1.*in_keyrange.*c1.*hash.*40-.*t2.*40-.*MariaDB/5-456-893.*Stopped", &sqltypes.Result{InsertID: 2}, nil) + tme.dbSourceClients[0].addQuery("select * from _vt.vreplication where id = 1", stoppedResult(1), nil) + tme.dbSourceClients[0].addQuery("select * from _vt.vreplication where id = 2", stoppedResult(2), nil) + tme.dbSourceClients[1].addQuery("select * from _vt.vreplication where id = 1", stoppedResult(1), nil) + tme.dbSourceClients[1].addQuery("select * from _vt.vreplication where id = 2", stoppedResult(2), nil) + } + createReverseVReplication() + + createJournals := func() { + journal1 := "insert into _vt.resharding_journal.*7672494164556733923,.*tables.*t1.*t2.*local_position.*MariaDB/5-456-892.*shard_gtids.*-80.*MariaDB/5-456-893.*participants.*40.*40" + tme.dbSourceClients[0].addQueryRE(journal1, &sqltypes.Result{}, nil) + journal2 := "insert into _vt.resharding_journal.*7672494164556733923,.*tables.*t1.*t2.*local_position.*MariaDB/5-456-892.*shard_gtids.*80.*MariaDB/5-456-893.*80.*participants.*40.*40" + tme.dbSourceClients[1].addQueryRE(journal2, &sqltypes.Result{}, nil) + } + createJournals() + + deleteTargetVReplication := func() { + tme.dbTargetClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks2' and workflow = 'test'", resultid12, nil) + tme.dbTargetClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks2' and workflow = 'test'", resultid12, nil) + tme.dbTargetClients[0].addQuery("update _vt.vreplication set message = 'FROZEN' where id in (1, 2)", &sqltypes.Result{}, nil) + tme.dbTargetClients[0].addQuery("select * from _vt.vreplication where id = 1", stoppedResult(1), nil) + tme.dbTargetClients[0].addQuery("select * from _vt.vreplication where id = 2", stoppedResult(2), nil) + tme.dbTargetClients[1].addQuery("update _vt.vreplication set message = 'FROZEN' where id in (1, 2)", &sqltypes.Result{}, nil) + tme.dbTargetClients[1].addQuery("select * from _vt.vreplication where id = 1", stoppedResult(1), nil) + tme.dbTargetClients[1].addQuery("select * from _vt.vreplication where id = 2", stoppedResult(2), nil) + + tme.dbTargetClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks2' and workflow = 'test'", resultid12, nil) + tme.dbTargetClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks2' and workflow = 'test'", resultid12, nil) + tme.dbTargetClients[0].addQuery("delete from _vt.vreplication where id in (1, 2)", &sqltypes.Result{}, nil) + tme.dbTargetClients[0].addQuery("delete from _vt.copy_state where vrepl_id in (1, 2)", &sqltypes.Result{}, nil) + tme.dbTargetClients[1].addQuery("delete from _vt.vreplication where id in (1, 2)", &sqltypes.Result{}, nil) + tme.dbTargetClients[1].addQuery("delete from _vt.copy_state where vrepl_id in (1, 2)", &sqltypes.Result{}, nil) + } + deleteTargetVReplication() + + _, err = tme.wr.MigrateWrites(ctx, tme.targetKeyspace, "test", 1*time.Second, false) + if err != nil { + t.Fatal(err) + } + verifyQueries(t, tme.allDBClients) +} + +func TestMigrateFrozen(t *testing.T) { + ctx := context.Background() + tme := newTestTableMigrater(ctx, t) + defer tme.stopTablets(t) + + err := tme.wr.MigrateReads(ctx, tme.targetKeyspace, "test", topodatapb.TabletType_RDONLY, nil, DirectionForward) + if err != nil { + t.Fatal(err) + } + + err = tme.wr.MigrateReads(ctx, tme.targetKeyspace, "test", topodatapb.TabletType_REPLICA, nil, DirectionForward) + if err != nil { + t.Fatal(err) + } + + bls1 := &binlogdatapb.BinlogSource{ + Keyspace: "ks1", + Shard: "-40", + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "/.*", + Filter: "", + }}, + }, + } + tme.dbTargetClients[0].addQuery(vreplQueryks2, sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "id|source|message", + "int64|varchar|varchar"), + fmt.Sprintf("1|%v|FROZEN", bls1), + ), nil) + tme.dbTargetClients[1].addQuery(vreplQueryks2, &sqltypes.Result{}, nil) + + err = tme.wr.MigrateReads(ctx, tme.targetKeyspace, "test", topodatapb.TabletType_REPLICA, nil, DirectionForward) + want := "cannot migrate reads while MigrateWrites is in progress" + if err == nil || err.Error() != want { + t.Errorf("MigrateReads(frozen) err: %v, want %v", err, want) + } + + tme.dbTargetClients[0].addQuery(vreplQueryks2, sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "id|source|message", + "int64|varchar|varchar"), + fmt.Sprintf("1|%v|FROZEN", bls1), + ), nil) + tme.dbTargetClients[1].addQuery(vreplQueryks2, &sqltypes.Result{}, nil) + + deleteTargetVReplication := func() { + tme.dbTargetClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks2' and workflow = 'test'", resultid1, nil) + tme.dbTargetClients[0].addQuery("update _vt.vreplication set message = 'FROZEN' where id in (1)", &sqltypes.Result{}, nil) + tme.dbTargetClients[0].addQuery("select * from _vt.vreplication where id = 1", stoppedResult(1), nil) + + tme.dbTargetClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks2' and workflow = 'test'", resultid1, nil) + tme.dbTargetClients[0].addQuery("delete from _vt.vreplication where id in (1)", &sqltypes.Result{}, nil) + tme.dbTargetClients[0].addQuery("delete from _vt.copy_state where vrepl_id in (1)", &sqltypes.Result{}, nil) + } + deleteTargetVReplication() + + _, err = tme.wr.MigrateWrites(ctx, tme.targetKeyspace, "test", 0*time.Second, true) + if err != nil { + t.Fatal(err) + } + verifyQueries(t, tme.allDBClients) +} + func TestMigrateNoStreamsFound(t *testing.T) { ctx := context.Background() tme := newTestTableMigrater(ctx, t) @@ -884,9 +1165,9 @@ func TestMigrateDistinctSources(t *testing.T) { }, } tme.dbTargetClients[0].addQuery(vreplQueryks2, sqltypes.MakeTestResult(sqltypes.MakeTestFields( - "id|source", - "int64|varchar"), - fmt.Sprintf("1|%v", bls), + "id|source|message", + "int64|varchar|varchar"), + fmt.Sprintf("1|%v|", bls), ), nil) err := tme.wr.MigrateReads(ctx, tme.targetKeyspace, "test", topodatapb.TabletType_RDONLY, nil, DirectionForward) @@ -912,9 +1193,9 @@ func TestMigrateMismatchedTables(t *testing.T) { }, } tme.dbTargetClients[0].addQuery(vreplQueryks2, sqltypes.MakeTestResult(sqltypes.MakeTestFields( - "id|source", - "int64|varchar"), - fmt.Sprintf("1|%v", bls)), + "id|source|message", + "int64|varchar|varchar"), + fmt.Sprintf("1|%v|", bls)), nil, ) @@ -965,10 +1246,10 @@ func TestMigrateNoTableWildcards(t *testing.T) { }, } tme.dbTargetClients[0].addQuery(vreplQueryks2, sqltypes.MakeTestResult(sqltypes.MakeTestFields( - "id|source", - "int64|varchar"), - fmt.Sprintf("1|%v", bls1), - fmt.Sprintf("2|%v", bls2), + "id|source|message", + "int64|varchar|varchar"), + fmt.Sprintf("1|%v|", bls1), + fmt.Sprintf("2|%v|", bls2), ), nil) bls3 := &binlogdatapb.BinlogSource{ Keyspace: "ks1", @@ -981,9 +1262,9 @@ func TestMigrateNoTableWildcards(t *testing.T) { }, } tme.dbTargetClients[1].addQuery(vreplQueryks2, sqltypes.MakeTestResult(sqltypes.MakeTestFields( - "id|source", - "int64|varchar"), - fmt.Sprintf("1|%v", bls3), + "id|source|message", + "int64|varchar|varchar"), + fmt.Sprintf("1|%v|", bls3), ), nil) err := tme.wr.MigrateReads(ctx, tme.targetKeyspace, "test", topodatapb.TabletType_RDONLY, nil, DirectionForward) @@ -993,6 +1274,23 @@ func TestMigrateNoTableWildcards(t *testing.T) { } } +func TestReverseName(t *testing.T) { + tests := []struct { + in, out string + }{{ + in: "aa", + out: "aa_reverse", + }, { + in: "aa_reverse", + out: "aa", + }} + for _, test := range tests { + if got, want := reverseName(test.in), test.out; got != want { + t.Errorf("reverseName(%s): %s, want %s", test.in, got, test.out) + } + } +} + func checkRouting(t *testing.T, wr *Wrangler, want map[string][]string) { t.Helper() ctx := context.Background() diff --git a/go/vt/wrangler/stream_migrater.go b/go/vt/wrangler/stream_migrater.go index 7d167c40759..f4737eedab0 100644 --- a/go/vt/wrangler/stream_migrater.go +++ b/go/vt/wrangler/stream_migrater.go @@ -41,7 +41,10 @@ import ( ) type streamMigrater struct { - mi *migrater + streams map[string][]*vrStream + workflows []string + templates []*vrStream + mi *migrater } type vrStream struct { @@ -51,24 +54,28 @@ type vrStream struct { pos mysql.Position } -func (sm *streamMigrater) stopStreams(ctx context.Context) ([]*vrStream, error) { +func buildStreamMigrater(ctx context.Context, mi *migrater) (*streamMigrater, error) { + sm := &streamMigrater{mi: mi} if sm.mi.migrationType == binlogdatapb.MigrationType_TABLES { // Source streams should be stopped only for shard migrations. - return nil, nil + return sm, nil } streams, err := sm.readSourceStreams(ctx) if err != nil { return nil, err } - streams, err = sm.stopSourceStreams(ctx, streams) - if err != nil { - return nil, err - } - positions, err := sm.syncSourceStreams(ctx, streams) - if err != nil { - return nil, err + sm.streams = streams + + // Loop executes only once. + for _, tabletStreams := range sm.streams { + tmpl, err := sm.templatize(ctx, tabletStreams) + if err != nil { + return nil, err + } + sm.workflows = tabletStreamWorkflows(tmpl) + return sm, nil } - return sm.verifyStreamPositions(ctx, streams, positions) + return sm, nil } func (sm *streamMigrater) readSourceStreams(ctx context.Context) (map[string][]*vrStream, error) { @@ -151,12 +158,26 @@ func (sm *streamMigrater) readSourceStreams(ctx context.Context) (map[string][]* return streams, nil } +func (sm *streamMigrater) stopStreams(ctx context.Context) ([]string, error) { + if sm.streams == nil { + return nil, nil + } + if err := sm.stopSourceStreams(ctx); err != nil { + return nil, err + } + positions, err := sm.syncSourceStreams(ctx) + if err != nil { + return nil, err + } + return sm.verifyStreamPositions(ctx, positions) +} + func (sm *streamMigrater) readTabletStreams(ctx context.Context, ti *topo.TabletInfo, constraint string) ([]*vrStream, error) { var query string if constraint == "" { - query = fmt.Sprintf("select id, workflow, source, pos from _vt.vreplication where db_name=%s", encodeString(ti.DbName())) + query = fmt.Sprintf("select id, workflow, source, pos from _vt.vreplication where db_name=%s and workflow != %s", encodeString(ti.DbName()), encodeString(sm.mi.reverseWorkflow)) } else { - query = fmt.Sprintf("select id, workflow, source, pos from _vt.vreplication where db_name=%s and %s", encodeString(ti.DbName()), constraint) + query = fmt.Sprintf("select id, workflow, source, pos from _vt.vreplication where db_name=%s and workflow != %s and %s", encodeString(ti.DbName()), encodeString(sm.mi.reverseWorkflow), constraint) } p3qr, err := sm.mi.wr.tmc.VReplicationExec(ctx, ti.Tablet, query) if err != nil { @@ -195,11 +216,11 @@ func (sm *streamMigrater) readTabletStreams(ctx context.Context, ti *topo.Tablet return tabletStreams, nil } -func (sm *streamMigrater) stopSourceStreams(ctx context.Context, streams map[string][]*vrStream) (map[string][]*vrStream, error) { +func (sm *streamMigrater) stopSourceStreams(ctx context.Context) error { stoppedStreams := make(map[string][]*vrStream) var mu sync.Mutex err := sm.mi.forAllSources(func(source *miSource) error { - tabletStreams := streams[source.si.ShardName()] + tabletStreams := sm.streams[source.si.ShardName()] if len(tabletStreams) == 0 { return nil } @@ -218,14 +239,15 @@ func (sm *streamMigrater) stopSourceStreams(ctx context.Context, streams map[str return nil }) if err != nil { - return nil, err + return err } - return stoppedStreams, nil + sm.streams = stoppedStreams + return nil } -func (sm *streamMigrater) syncSourceStreams(ctx context.Context, streams map[string][]*vrStream) (map[string]mysql.Position, error) { +func (sm *streamMigrater) syncSourceStreams(ctx context.Context) (map[string]mysql.Position, error) { stopPositions := make(map[string]mysql.Position) - for _, tabletStreams := range streams { + for _, tabletStreams := range sm.streams { for _, vrs := range tabletStreams { key := fmt.Sprintf("%s:%s", vrs.bls.Keyspace, vrs.bls.Shard) pos, ok := stopPositions[key] @@ -236,7 +258,7 @@ func (sm *streamMigrater) syncSourceStreams(ctx context.Context, streams map[str } var wg sync.WaitGroup allErrors := &concurrency.AllErrorRecorder{} - for _, tabletStreams := range streams { + for _, tabletStreams := range sm.streams { for _, vrs := range tabletStreams { key := fmt.Sprintf("%s:%s", vrs.bls.Keyspace, vrs.bls.Shard) pos := stopPositions[key] @@ -272,11 +294,11 @@ func (sm *streamMigrater) syncSourceStreams(ctx context.Context, streams map[str return stopPositions, allErrors.AggrError(vterrors.Aggregate) } -func (sm *streamMigrater) verifyStreamPositions(ctx context.Context, streams map[string][]*vrStream, stopPositions map[string]mysql.Position) ([]*vrStream, error) { +func (sm *streamMigrater) verifyStreamPositions(ctx context.Context, stopPositions map[string]mysql.Position) ([]string, error) { stoppedStreams := make(map[string][]*vrStream) var mu sync.Mutex err := sm.mi.forAllSources(func(source *miSource) error { - tabletStreams := streams[source.si.ShardName()] + tabletStreams := sm.streams[source.si.ShardName()] if len(tabletStreams) == 0 { return nil } @@ -292,6 +314,11 @@ func (sm *streamMigrater) verifyStreamPositions(ctx context.Context, streams map if err != nil { return nil, err } + + // This is not really required because it's not used later. + // But we keep it up-to-date for good measure. + sm.streams = stoppedStreams + var oneSet []*vrStream allErrors := &concurrency.AllErrorRecorder{} for _, tabletStreams := range stoppedStreams { @@ -306,29 +333,29 @@ func (sm *streamMigrater) verifyStreamPositions(ctx context.Context, streams map } } } - return oneSet, allErrors.AggrError(vterrors.Aggregate) + if allErrors.HasErrors() { + return nil, allErrors.AggrError(vterrors.Aggregate) + } + sm.templates, err = sm.templatize(ctx, oneSet) + if err != nil { + // Unreachable: we've already templatized this before. + return nil, err + } + return tabletStreamWorkflows(sm.templates), allErrors.AggrError(vterrors.Aggregate) } -func (sm *streamMigrater) migrateStreams(ctx context.Context, tabletStreams []*vrStream) ([]string, error) { - if sm.mi.migrationType == binlogdatapb.MigrationType_TABLES { - return nil, nil +func (sm *streamMigrater) migrateStreams(ctx context.Context) error { + if sm.streams == nil { + return nil } // Delete any previous stray workflows that might have been left-over // due to a failed migration. - if err := sm.deleteTargetStreams(ctx, tabletStreams); err != nil { - return nil, err + if err := sm.deleteTargetStreams(ctx); err != nil { + return err } - tmpl, err := sm.templatize(ctx, tabletStreams) - if err != nil { - return nil, err - } - workflows := tabletStreamWorkflows(tmpl) - if err := sm.createTargetStreams(ctx, tmpl); err != nil { - return nil, err - } - return workflows, nil + return sm.createTargetStreams(ctx, sm.templates) } const ( @@ -462,8 +489,7 @@ func (sm *streamMigrater) createTargetStreams(ctx context.Context, tmpl []*vrStr for _, rule := range vrs.bls.Filter.Rules { buf := &strings.Builder{} t := template.Must(template.New("").Parse(rule.Filter)) - err := t.Execute(buf, target.si.ShardName()) - if err != nil { + if err := t.Execute(buf, key.KeyRangeString(target.si.KeyRange)); err != nil { return err } rule.Filter = buf.String() @@ -492,25 +518,15 @@ func (sm *streamMigrater) createTargetStreams(ctx context.Context, tmpl []*vrStr } func (sm *streamMigrater) cancelMigration(ctx context.Context) { - if sm.mi.migrationType == binlogdatapb.MigrationType_TABLES { - return - } - tabletStreams, err := sm.readSourceStreamsForCancel(ctx) - if err != nil { - sm.mi.wr.Logger().Errorf("Cancel migration failed: could not read streams metadata: %v", err) + if sm.streams == nil { return } // Ignore error. We still want to restart the source streams if deleteTargetStreams fails. - _ = sm.deleteTargetStreams(ctx, tabletStreams) + _ = sm.deleteTargetStreams(ctx) - workflows := tabletStreamWorkflows(tabletStreams) - if len(workflows) == 0 { - return - } - workflowList := stringListify(workflows) - err = sm.mi.forAllSources(func(source *miSource) error { - query := fmt.Sprintf("update _vt.vreplication set state='Running', stop_pos=null, message='' where db_name=%s and workflow in (%s)", encodeString(source.master.DbName()), workflowList) + err := sm.mi.forAllSources(func(source *miSource) error { + query := fmt.Sprintf("update _vt.vreplication set state='Running', stop_pos=null, message='' where db_name=%s and workflow != %s", encodeString(source.master.DbName()), encodeString(sm.mi.reverseWorkflow)) _, err := sm.mi.wr.VReplicationExec(ctx, source.master.Alias, query) return err }) @@ -519,12 +535,11 @@ func (sm *streamMigrater) cancelMigration(ctx context.Context) { } } -func (sm *streamMigrater) deleteTargetStreams(ctx context.Context, tabletStreams []*vrStream) error { - workflows := tabletStreamWorkflows(tabletStreams) - if len(workflows) == 0 { +func (sm *streamMigrater) deleteTargetStreams(ctx context.Context) error { + if len(sm.workflows) == 0 { return nil } - workflowList := stringListify(workflows) + workflowList := stringListify(sm.workflows) err := sm.mi.forAllTargets(func(target *miTarget) error { query := fmt.Sprintf("delete from _vt.vreplication where db_name=%s and workflow in (%s)", encodeString(target.master.DbName()), workflowList) _, err := sm.mi.wr.VReplicationExec(ctx, target.master.Alias, query) @@ -536,57 +551,27 @@ func (sm *streamMigrater) deleteTargetStreams(ctx context.Context, tabletStreams return err } -func (sm *streamMigrater) readSourceStreamsForCancel(ctx context.Context) ([]*vrStream, error) { - streams := make(map[string][]*vrStream) - var mu sync.Mutex - err := sm.mi.forAllSources(func(source *miSource) error { - tabletStreams, err := sm.readTabletStreams(ctx, source.master, "") - if err != nil { - return err - } - if len(tabletStreams) == 0 { - return nil - } - - mu.Lock() - defer mu.Unlock() - streams[source.si.ShardName()] = tabletStreams - return nil - }) - if err != nil { - return nil, err - } - var oneSet []*vrStream - for _, tabletStream := range streams { - oneSet = tabletStream - break - } - return oneSet, nil -} - // finalize performs the final cleanup: start all the newly migrated target streams // and delete them from the source. func (sm *streamMigrater) finalize(ctx context.Context, workflows []string) error { - if sm.mi.migrationType == binlogdatapb.MigrationType_TABLES { - return nil - } if len(workflows) == 0 { return nil } workflowList := stringListify(workflows) - err := sm.mi.forAllTargets(func(target *miTarget) error { - query := fmt.Sprintf("update _vt.vreplication set state='Running' where db_name=%s and workflow in (%s)", encodeString(target.master.DbName()), workflowList) - _, err := sm.mi.wr.VReplicationExec(ctx, target.master.Alias, query) + err := sm.mi.forAllSources(func(source *miSource) error { + query := fmt.Sprintf("delete from _vt.vreplication where db_name=%s and workflow in (%s)", encodeString(source.master.DbName()), workflowList) + _, err := sm.mi.wr.VReplicationExec(ctx, source.master.Alias, query) return err }) if err != nil { return err } - return sm.mi.forAllSources(func(source *miSource) error { - query := fmt.Sprintf("delete from _vt.vreplication where db_name=%s and workflow in (%s)", encodeString(source.master.DbName()), workflowList) - _, err := sm.mi.wr.VReplicationExec(ctx, source.master.Alias, query) + err = sm.mi.forAllTargets(func(target *miTarget) error { + query := fmt.Sprintf("update _vt.vreplication set state='Running' where db_name=%s and workflow in (%s)", encodeString(target.master.DbName()), workflowList) + _, err := sm.mi.wr.VReplicationExec(ctx, target.master.Alias, query) return err }) + return err } func tabletStreamValues(tabletStreams []*vrStream) string { diff --git a/go/vt/wrangler/stream_migrater_test.go b/go/vt/wrangler/stream_migrater_test.go index 7cd91fe0c84..7e97aac9170 100644 --- a/go/vt/wrangler/stream_migrater_test.go +++ b/go/vt/wrangler/stream_migrater_test.go @@ -53,8 +53,8 @@ func TestStreamMigrateMainflow(t *testing.T) { stopStreams := func() { // sm.stopStreams->sm.readSourceStreams->readTabletStreams('Stopped') - tme.dbSourceClients[0].addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and state = 'Stopped'", &sqltypes.Result{}, nil) - tme.dbSourceClients[1].addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and state = 'Stopped'", &sqltypes.Result{}, nil) + tme.dbSourceClients[0].addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and workflow != 'test_reverse' and state = 'Stopped'", &sqltypes.Result{}, nil) + tme.dbSourceClients[1].addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and workflow != 'test_reverse' and state = 'Stopped'", &sqltypes.Result{}, nil) // pre-compute sourceRows because they're re-read multiple times. var sourceRows [][]string @@ -81,7 +81,7 @@ func TestStreamMigrateMainflow(t *testing.T) { for i, dbclient := range tme.dbSourceClients { // sm.stopStreams->sm.readSourceStreams->readTabletStreams('') and VReplicationExec(_vt.copy_state) - dbclient.addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks'", sqltypes.MakeTestResult(sqltypes.MakeTestFields( + dbclient.addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and workflow != 'test_reverse'", sqltypes.MakeTestResult(sqltypes.MakeTestFields( "id|workflow|source|pos", "int64|varbinary|varchar|varbinary"), sourceRows[i]...), @@ -95,14 +95,14 @@ func TestStreamMigrateMainflow(t *testing.T) { dbclient.addQuery("select * from _vt.vreplication where id = 2", stoppedResult(2), nil) // sm.stopStreams->sm.stopSourceStreams->sm.readTabletStreams('id in...') - dbclient.addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and id in (1, 2)", sqltypes.MakeTestResult(sqltypes.MakeTestFields( + dbclient.addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and workflow != 'test_reverse' and id in (1, 2)", sqltypes.MakeTestResult(sqltypes.MakeTestFields( "id|workflow|source|pos", "int64|varbinary|varchar|varbinary"), sourceRows[i]...), nil) // sm.stopStreams->sm.verifyStreamPositions->sm.readTabletStreams('id in...') - dbclient.addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and id in (1, 2)", sqltypes.MakeTestResult(sqltypes.MakeTestFields( + dbclient.addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and workflow != 'test_reverse' and id in (1, 2)", sqltypes.MakeTestResult(sqltypes.MakeTestFields( "id|workflow|source|pos", "int64|varbinary|varchar|varbinary"), sourceRows[i]...), @@ -137,9 +137,16 @@ func TestStreamMigrateMainflow(t *testing.T) { journal := "insert into _vt.resharding_journal.*source_workflows.*t1t2" tme.dbSourceClients[0].addQueryRE(journal, &sqltypes.Result{}, nil) tme.dbSourceClients[1].addQueryRE(journal, &sqltypes.Result{}, nil) - tme.expectCreateReverseReplication() finalize := func() { + // sm.finalize->Source + tme.dbSourceClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow in ('t1t2')", resultid12, nil) + tme.dbSourceClients[0].addQuery("delete from _vt.vreplication where id in (1, 2)", &sqltypes.Result{}, nil) + tme.dbSourceClients[0].addQuery("delete from _vt.copy_state where vrepl_id in (1, 2)", &sqltypes.Result{}, nil) + tme.dbSourceClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow in ('t1t2')", resultid12, nil) + tme.dbSourceClients[1].addQuery("delete from _vt.vreplication where id in (1, 2)", &sqltypes.Result{}, nil) + tme.dbSourceClients[1].addQuery("delete from _vt.copy_state where vrepl_id in (1, 2)", &sqltypes.Result{}, nil) + // sm.finalize->Target tme.dbTargetClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow in ('t1t2')", resultid34, nil) tme.dbTargetClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow in ('t1t2')", resultid34, nil) @@ -149,20 +156,14 @@ func TestStreamMigrateMainflow(t *testing.T) { tme.dbTargetClients[1].addQuery("select * from _vt.vreplication where id = 3", stoppedResult(3), nil) tme.dbTargetClients[0].addQuery("select * from _vt.vreplication where id = 4", stoppedResult(4), nil) tme.dbTargetClients[1].addQuery("select * from _vt.vreplication where id = 4", stoppedResult(4), nil) - - // sm.finalize->Source - tme.dbSourceClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow in ('t1t2')", resultid12, nil) - tme.dbSourceClients[0].addQuery("delete from _vt.vreplication where id in (1, 2)", &sqltypes.Result{}, nil) - tme.dbSourceClients[0].addQuery("delete from _vt.copy_state where vrepl_id in (1, 2)", &sqltypes.Result{}, nil) - tme.dbSourceClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow in ('t1t2')", resultid12, nil) - tme.dbSourceClients[1].addQuery("delete from _vt.vreplication where id in (1, 2)", &sqltypes.Result{}, nil) - tme.dbSourceClients[1].addQuery("delete from _vt.copy_state where vrepl_id in (1, 2)", &sqltypes.Result{}, nil) } finalize() + tme.expectCreateReverseVReplication() + tme.expectStartReverseVReplication() tme.expectDeleteTargetVReplication() - if _, err := tme.wr.MigrateWrites(ctx, tme.targetKeyspace, "test", 1*time.Second); err != nil { + if _, err := tme.wr.MigrateWrites(ctx, tme.targetKeyspace, "test", 1*time.Second, true); err != nil { t.Fatal(err) } @@ -198,8 +199,8 @@ func TestStreamMigrateTwoStreams(t *testing.T) { stopStreams := func() { // sm.stopStreams->sm.readSourceStreams->readTabletStreams('Stopped') - tme.dbSourceClients[0].addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and state = 'Stopped'", &sqltypes.Result{}, nil) - tme.dbSourceClients[1].addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and state = 'Stopped'", &sqltypes.Result{}, nil) + tme.dbSourceClients[0].addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and workflow != 'test_reverse' and state = 'Stopped'", &sqltypes.Result{}, nil) + tme.dbSourceClients[1].addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and workflow != 'test_reverse' and state = 'Stopped'", &sqltypes.Result{}, nil) // pre-compute sourceRows because they're re-read multiple times. var sourceRows [][]string @@ -239,7 +240,7 @@ func TestStreamMigrateTwoStreams(t *testing.T) { for i, dbclient := range tme.dbSourceClients { // sm.stopStreams->sm.readSourceStreams->readTabletStreams('') and VReplicationExec(_vt.copy_state) - dbclient.addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks'", sqltypes.MakeTestResult(sqltypes.MakeTestFields( + dbclient.addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and workflow != 'test_reverse'", sqltypes.MakeTestResult(sqltypes.MakeTestFields( "id|workflow|source|pos", "int64|varbinary|varchar|varbinary"), sourceRows[i]...), @@ -255,14 +256,14 @@ func TestStreamMigrateTwoStreams(t *testing.T) { dbclient.addQuery("select * from _vt.vreplication where id = 4", stoppedResult(3), nil) // sm.stopStreams->sm.stopSourceStreams->sm.readTabletStreams('id in...') - dbclient.addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and id in (1, 2, 3, 4)", sqltypes.MakeTestResult(sqltypes.MakeTestFields( + dbclient.addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and workflow != 'test_reverse' and id in (1, 2, 3, 4)", sqltypes.MakeTestResult(sqltypes.MakeTestFields( "id|workflow|source|pos", "int64|varbinary|varchar|varbinary"), sourceRows[i]...), nil) // sm.stopStreams->sm.verifyStreamPositions->sm.readTabletStreams('id in...') - dbclient.addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and id in (1, 2, 3, 4)", sqltypes.MakeTestResult(sqltypes.MakeTestFields( + dbclient.addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and workflow != 'test_reverse' and id in (1, 2, 3, 4)", sqltypes.MakeTestResult(sqltypes.MakeTestFields( "id|workflow|source|pos", "int64|varbinary|varchar|varbinary"), sourceRows[i]...), @@ -299,9 +300,16 @@ func TestStreamMigrateTwoStreams(t *testing.T) { migrateStreams() tme.expectCreateJournals() - tme.expectCreateReverseReplication() finalize := func() { + // sm.finalize->Source + tme.dbSourceClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow in ('t1t2', 't3')", resultid1234, nil) + tme.dbSourceClients[0].addQuery("delete from _vt.vreplication where id in (1, 2, 3, 4)", &sqltypes.Result{}, nil) + tme.dbSourceClients[0].addQuery("delete from _vt.copy_state where vrepl_id in (1, 2, 3, 4)", &sqltypes.Result{}, nil) + tme.dbSourceClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow in ('t1t2', 't3')", resultid1234, nil) + tme.dbSourceClients[1].addQuery("delete from _vt.vreplication where id in (1, 2, 3, 4)", &sqltypes.Result{}, nil) + tme.dbSourceClients[1].addQuery("delete from _vt.copy_state where vrepl_id in (1, 2, 3, 4)", &sqltypes.Result{}, nil) + // sm.finalize->Target tme.dbTargetClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow in ('t1t2', 't3')", resultid3456, nil) tme.dbTargetClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow in ('t1t2', 't3')", resultid3456, nil) @@ -315,20 +323,14 @@ func TestStreamMigrateTwoStreams(t *testing.T) { tme.dbTargetClients[1].addQuery("select * from _vt.vreplication where id = 5", stoppedResult(5), nil) tme.dbTargetClients[0].addQuery("select * from _vt.vreplication where id = 6", stoppedResult(6), nil) tme.dbTargetClients[1].addQuery("select * from _vt.vreplication where id = 6", stoppedResult(6), nil) - - // sm.finalize->Source - tme.dbSourceClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow in ('t1t2', 't3')", resultid1234, nil) - tme.dbSourceClients[0].addQuery("delete from _vt.vreplication where id in (1, 2, 3, 4)", &sqltypes.Result{}, nil) - tme.dbSourceClients[0].addQuery("delete from _vt.copy_state where vrepl_id in (1, 2, 3, 4)", &sqltypes.Result{}, nil) - tme.dbSourceClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow in ('t1t2', 't3')", resultid1234, nil) - tme.dbSourceClients[1].addQuery("delete from _vt.vreplication where id in (1, 2, 3, 4)", &sqltypes.Result{}, nil) - tme.dbSourceClients[1].addQuery("delete from _vt.copy_state where vrepl_id in (1, 2, 3, 4)", &sqltypes.Result{}, nil) } finalize() + tme.expectCreateReverseVReplication() + tme.expectStartReverseVReplication() tme.expectDeleteTargetVReplication() - if _, err := tme.wr.MigrateWrites(ctx, tme.targetKeyspace, "test", 1*time.Second); err != nil { + if _, err := tme.wr.MigrateWrites(ctx, tme.targetKeyspace, "test", 1*time.Second, true); err != nil { t.Fatal(err) } @@ -364,7 +366,7 @@ func TestStreamMigrateOneToMany(t *testing.T) { stopStreams := func() { // sm.stopStreams->sm.readSourceStreams->readTabletStreams('Stopped') - tme.dbSourceClients[0].addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and state = 'Stopped'", &sqltypes.Result{}, nil) + tme.dbSourceClients[0].addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and workflow != 'test_reverse' and state = 'Stopped'", &sqltypes.Result{}, nil) // pre-compute sourceRows because they're re-read multiple times. var sourceRows [][]string @@ -388,7 +390,7 @@ func TestStreamMigrateOneToMany(t *testing.T) { for i, dbclient := range tme.dbSourceClients { // sm.stopStreams->sm.readSourceStreams->readTabletStreams('') and VReplicationExec(_vt.copy_state) - dbclient.addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks'", sqltypes.MakeTestResult(sqltypes.MakeTestFields( + dbclient.addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and workflow != 'test_reverse'", sqltypes.MakeTestResult(sqltypes.MakeTestFields( "id|workflow|source|pos", "int64|varbinary|varchar|varbinary"), sourceRows[i]...), @@ -401,14 +403,14 @@ func TestStreamMigrateOneToMany(t *testing.T) { dbclient.addQuery("select * from _vt.vreplication where id = 1", stoppedResult(1), nil) // sm.stopStreams->sm.stopSourceStreams->sm.readTabletStreams('id in...') - dbclient.addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and id in (1)", sqltypes.MakeTestResult(sqltypes.MakeTestFields( + dbclient.addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and workflow != 'test_reverse' and id in (1)", sqltypes.MakeTestResult(sqltypes.MakeTestFields( "id|workflow|source|pos", "int64|varbinary|varchar|varbinary"), sourceRows[i]...), nil) // sm.stopStreams->sm.verifyStreamPositions->sm.readTabletStreams('id in...') - dbclient.addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and id in (1)", sqltypes.MakeTestResult(sqltypes.MakeTestFields( + dbclient.addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and workflow != 'test_reverse' and id in (1)", sqltypes.MakeTestResult(sqltypes.MakeTestFields( "id|workflow|source|pos", "int64|varbinary|varchar|varbinary"), sourceRows[i]...), @@ -439,9 +441,13 @@ func TestStreamMigrateOneToMany(t *testing.T) { migrateStreams() tme.expectCreateJournals() - tme.expectCreateReverseReplication() finalize := func() { + // sm.finalize->Source + tme.dbSourceClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow in ('t1')", resultid1, nil) + tme.dbSourceClients[0].addQuery("delete from _vt.vreplication where id in (1)", &sqltypes.Result{}, nil) + tme.dbSourceClients[0].addQuery("delete from _vt.copy_state where vrepl_id in (1)", &sqltypes.Result{}, nil) + // sm.finalize->Target tme.dbTargetClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow in ('t1')", resultid3, nil) tme.dbTargetClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow in ('t1')", resultid3, nil) @@ -449,17 +455,14 @@ func TestStreamMigrateOneToMany(t *testing.T) { tme.dbTargetClients[1].addQuery("update _vt.vreplication set state = 'Running' where id in (3)", &sqltypes.Result{}, nil) tme.dbTargetClients[0].addQuery("select * from _vt.vreplication where id = 3", stoppedResult(3), nil) tme.dbTargetClients[1].addQuery("select * from _vt.vreplication where id = 3", stoppedResult(3), nil) - - // sm.finalize->Source - tme.dbSourceClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow in ('t1')", resultid1, nil) - tme.dbSourceClients[0].addQuery("delete from _vt.vreplication where id in (1)", &sqltypes.Result{}, nil) - tme.dbSourceClients[0].addQuery("delete from _vt.copy_state where vrepl_id in (1)", &sqltypes.Result{}, nil) } finalize() + tme.expectCreateReverseVReplication() + tme.expectStartReverseVReplication() tme.expectDeleteTargetVReplication() - if _, err := tme.wr.MigrateWrites(ctx, tme.targetKeyspace, "test", 1*time.Second); err != nil { + if _, err := tme.wr.MigrateWrites(ctx, tme.targetKeyspace, "test", 1*time.Second, true); err != nil { t.Fatal(err) } @@ -494,8 +497,8 @@ func TestStreamMigrateManyToOne(t *testing.T) { stopStreams := func() { // sm.stopStreams->sm.readSourceStreams->readTabletStreams('Stopped') - tme.dbSourceClients[0].addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and state = 'Stopped'", &sqltypes.Result{}, nil) - tme.dbSourceClients[1].addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and state = 'Stopped'", &sqltypes.Result{}, nil) + tme.dbSourceClients[0].addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and workflow != 'test_reverse' and state = 'Stopped'", &sqltypes.Result{}, nil) + tme.dbSourceClients[1].addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and workflow != 'test_reverse' and state = 'Stopped'", &sqltypes.Result{}, nil) // pre-compute sourceRows because they're re-read multiple times. var sourceRows [][]string @@ -519,7 +522,7 @@ func TestStreamMigrateManyToOne(t *testing.T) { for i, dbclient := range tme.dbSourceClients { // sm.stopStreams->sm.readSourceStreams->readTabletStreams('') and VReplicationExec(_vt.copy_state) - dbclient.addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks'", sqltypes.MakeTestResult(sqltypes.MakeTestFields( + dbclient.addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and workflow != 'test_reverse'", sqltypes.MakeTestResult(sqltypes.MakeTestFields( "id|workflow|source|pos", "int64|varbinary|varchar|varbinary"), sourceRows[i]...), @@ -533,14 +536,14 @@ func TestStreamMigrateManyToOne(t *testing.T) { dbclient.addQuery("select * from _vt.vreplication where id = 2", stoppedResult(2), nil) // sm.stopStreams->sm.stopSourceStreams->sm.readTabletStreams('id in...') - dbclient.addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and id in (1, 2)", sqltypes.MakeTestResult(sqltypes.MakeTestFields( + dbclient.addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and workflow != 'test_reverse' and id in (1, 2)", sqltypes.MakeTestResult(sqltypes.MakeTestFields( "id|workflow|source|pos", "int64|varbinary|varchar|varbinary"), sourceRows[i]...), nil) // sm.stopStreams->sm.verifyStreamPositions->sm.readTabletStreams('id in...') - dbclient.addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and id in (1, 2)", sqltypes.MakeTestResult(sqltypes.MakeTestFields( + dbclient.addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and workflow != 'test_reverse' and id in (1, 2)", sqltypes.MakeTestResult(sqltypes.MakeTestFields( "id|workflow|source|pos", "int64|varbinary|varchar|varbinary"), sourceRows[i]...), @@ -569,17 +572,11 @@ func TestStreamMigrateManyToOne(t *testing.T) { } } migrateStreams() + tme.expectCreateReverseVReplication() tme.expectCreateJournals() - tme.expectCreateReverseReplication() finalize := func() { - // sm.finalize->Target - tme.dbTargetClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow in ('t1')", resultid34, nil) - tme.dbTargetClients[0].addQuery("update _vt.vreplication set state = 'Running' where id in (3, 4)", &sqltypes.Result{}, nil) - tme.dbTargetClients[0].addQuery("select * from _vt.vreplication where id = 3", stoppedResult(3), nil) - tme.dbTargetClients[0].addQuery("select * from _vt.vreplication where id = 4", stoppedResult(4), nil) - // sm.finalize->Source tme.dbSourceClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow in ('t1')", resultid12, nil) tme.dbSourceClients[0].addQuery("delete from _vt.vreplication where id in (1, 2)", &sqltypes.Result{}, nil) @@ -587,12 +584,19 @@ func TestStreamMigrateManyToOne(t *testing.T) { tme.dbSourceClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow in ('t1')", resultid12, nil) tme.dbSourceClients[1].addQuery("delete from _vt.vreplication where id in (1, 2)", &sqltypes.Result{}, nil) tme.dbSourceClients[1].addQuery("delete from _vt.copy_state where vrepl_id in (1, 2)", &sqltypes.Result{}, nil) + + // sm.finalize->Target + tme.dbTargetClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow in ('t1')", resultid34, nil) + tme.dbTargetClients[0].addQuery("update _vt.vreplication set state = 'Running' where id in (3, 4)", &sqltypes.Result{}, nil) + tme.dbTargetClients[0].addQuery("select * from _vt.vreplication where id = 3", stoppedResult(3), nil) + tme.dbTargetClients[0].addQuery("select * from _vt.vreplication where id = 4", stoppedResult(4), nil) } finalize() + tme.expectStartReverseVReplication() tme.expectDeleteTargetVReplication() - if _, err := tme.wr.MigrateWrites(ctx, tme.targetKeyspace, "test", 1*time.Second); err != nil { + if _, err := tme.wr.MigrateWrites(ctx, tme.targetKeyspace, "test", 1*time.Second, true); err != nil { t.Fatal(err) } @@ -626,8 +630,8 @@ func TestStreamMigrateSyncSuccess(t *testing.T) { stopStreams := func() { // sm.stopStreams->sm.readSourceStreams->readTabletStreams('Stopped') - tme.dbSourceClients[0].addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and state = 'Stopped'", &sqltypes.Result{}, nil) - tme.dbSourceClients[1].addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and state = 'Stopped'", &sqltypes.Result{}, nil) + tme.dbSourceClients[0].addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and workflow != 'test_reverse' and state = 'Stopped'", &sqltypes.Result{}, nil) + tme.dbSourceClients[1].addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and workflow != 'test_reverse' and state = 'Stopped'", &sqltypes.Result{}, nil) var sourceRows [][]string for i, sourceTargetShard := range tme.sourceShards { @@ -683,7 +687,7 @@ func TestStreamMigrateSyncSuccess(t *testing.T) { for i, dbclient := range tme.dbSourceClients { // sm.stopStreams->sm.readSourceStreams->readTabletStreams('') and VReplicationExec(_vt.copy_state) - dbclient.addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks'", sqltypes.MakeTestResult(sqltypes.MakeTestFields( + dbclient.addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and workflow != 'test_reverse'", sqltypes.MakeTestResult(sqltypes.MakeTestFields( "id|workflow|source|pos", "int64|varbinary|varchar|varbinary"), sourceRows[i]...), @@ -697,14 +701,14 @@ func TestStreamMigrateSyncSuccess(t *testing.T) { dbclient.addQuery("select * from _vt.vreplication where id = 2", stoppedResult(2), nil) // sm.stopStreams->sm.stopSourceStreams->sm.readTabletStreams('id in...') - dbclient.addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and id in (1, 2)", sqltypes.MakeTestResult(sqltypes.MakeTestFields( + dbclient.addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and workflow != 'test_reverse' and id in (1, 2)", sqltypes.MakeTestResult(sqltypes.MakeTestFields( "id|workflow|source|pos", "int64|varbinary|varchar|varbinary"), sourceRows[i]...), nil) // sm.stopStreams->sm.verifyStreamPositions->sm.readTabletStreams('id in...') - dbclient.addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and id in (1, 2)", sqltypes.MakeTestResult(sqltypes.MakeTestFields( + dbclient.addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and workflow != 'test_reverse' and id in (1, 2)", sqltypes.MakeTestResult(sqltypes.MakeTestFields( "id|workflow|source|pos", "int64|varbinary|varchar|varbinary"), finalSources[i]...), @@ -754,9 +758,16 @@ func TestStreamMigrateSyncSuccess(t *testing.T) { migrateStreams() tme.expectCreateJournals() - tme.expectCreateReverseReplication() finalize := func() { + // sm.finalize->Source + tme.dbSourceClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow in ('t1')", resultid12, nil) + tme.dbSourceClients[0].addQuery("delete from _vt.vreplication where id in (1, 2)", &sqltypes.Result{}, nil) + tme.dbSourceClients[0].addQuery("delete from _vt.copy_state where vrepl_id in (1, 2)", &sqltypes.Result{}, nil) + tme.dbSourceClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow in ('t1')", resultid12, nil) + tme.dbSourceClients[1].addQuery("delete from _vt.vreplication where id in (1, 2)", &sqltypes.Result{}, nil) + tme.dbSourceClients[1].addQuery("delete from _vt.copy_state where vrepl_id in (1, 2)", &sqltypes.Result{}, nil) + // sm.finalize->Target tme.dbTargetClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow in ('t1')", resultid34, nil) tme.dbTargetClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow in ('t1')", resultid34, nil) @@ -766,20 +777,14 @@ func TestStreamMigrateSyncSuccess(t *testing.T) { tme.dbTargetClients[1].addQuery("select * from _vt.vreplication where id = 3", stoppedResult(3), nil) tme.dbTargetClients[0].addQuery("select * from _vt.vreplication where id = 4", stoppedResult(4), nil) tme.dbTargetClients[1].addQuery("select * from _vt.vreplication where id = 4", stoppedResult(4), nil) - - // sm.finalize->Source - tme.dbSourceClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow in ('t1')", resultid12, nil) - tme.dbSourceClients[0].addQuery("delete from _vt.vreplication where id in (1, 2)", &sqltypes.Result{}, nil) - tme.dbSourceClients[0].addQuery("delete from _vt.copy_state where vrepl_id in (1, 2)", &sqltypes.Result{}, nil) - tme.dbSourceClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow in ('t1')", resultid12, nil) - tme.dbSourceClients[1].addQuery("delete from _vt.vreplication where id in (1, 2)", &sqltypes.Result{}, nil) - tme.dbSourceClients[1].addQuery("delete from _vt.copy_state where vrepl_id in (1, 2)", &sqltypes.Result{}, nil) } finalize() + tme.expectCreateReverseVReplication() + tme.expectStartReverseVReplication() tme.expectDeleteTargetVReplication() - if _, err := tme.wr.MigrateWrites(ctx, tme.targetKeyspace, "test", 1*time.Second); err != nil { + if _, err := tme.wr.MigrateWrites(ctx, tme.targetKeyspace, "test", 1*time.Second, true); err != nil { t.Fatal(err) } @@ -815,8 +820,8 @@ func TestStreamMigrateSyncFail(t *testing.T) { stopStreams := func() { // sm.stopStreams->sm.readSourceStreams->readTabletStreams('Stopped') - tme.dbSourceClients[0].addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and state = 'Stopped'", &sqltypes.Result{}, nil) - tme.dbSourceClients[1].addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and state = 'Stopped'", &sqltypes.Result{}, nil) + tme.dbSourceClients[0].addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and workflow != 'test_reverse' and state = 'Stopped'", &sqltypes.Result{}, nil) + tme.dbSourceClients[1].addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and workflow != 'test_reverse' and state = 'Stopped'", &sqltypes.Result{}, nil) var sourceRows [][]string for i, sourceTargetShard := range tme.sourceShards { @@ -854,7 +859,7 @@ func TestStreamMigrateSyncFail(t *testing.T) { for i, dbclient := range tme.dbSourceClients { // sm.stopStreams->sm.readSourceStreams->readTabletStreams('') and VReplicationExec(_vt.copy_state) - dbclient.addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks'", sqltypes.MakeTestResult(sqltypes.MakeTestFields( + dbclient.addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and workflow != 'test_reverse'", sqltypes.MakeTestResult(sqltypes.MakeTestFields( "id|workflow|source|pos", "int64|varbinary|varchar|varbinary"), sourceRows[i]...), @@ -868,21 +873,21 @@ func TestStreamMigrateSyncFail(t *testing.T) { dbclient.addQuery("select * from _vt.vreplication where id = 2", stoppedResult(2), nil) // sm.stopStreams->sm.stopSourceStreams->sm.readTabletStreams('id in...') - dbclient.addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and id in (1, 2)", sqltypes.MakeTestResult(sqltypes.MakeTestFields( + dbclient.addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and workflow != 'test_reverse' and id in (1, 2)", sqltypes.MakeTestResult(sqltypes.MakeTestFields( "id|workflow|source|pos", "int64|varbinary|varchar|varbinary"), sourceRows[i]...), nil) // sm.stopStreams->sm.verifyStreamPositions->sm.readTabletStreams('id in...') - dbclient.addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and id in (1, 2)", sqltypes.MakeTestResult(sqltypes.MakeTestFields( + dbclient.addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and workflow != 'test_reverse' and id in (1, 2)", sqltypes.MakeTestResult(sqltypes.MakeTestFields( "id|workflow|source|pos", "int64|varbinary|varchar|varbinary"), sourceRows[i]...), nil) // sm.cancelMigration->sm.readSourceStreamsForCancel: this is not actually stopStream, but we're reusing the bls here. - dbclient.addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks'", sqltypes.MakeTestResult(sqltypes.MakeTestFields( + dbclient.addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and workflow != 'test_reverse'", sqltypes.MakeTestResult(sqltypes.MakeTestFields( "id|workflow|source|pos", "int64|varbinary|varchar|varbinary"), sourceRows[i]...), @@ -934,7 +939,7 @@ func TestStreamMigrateSyncFail(t *testing.T) { tme.expectCancelMigration() - _, err = tme.wr.MigrateWrites(ctx, tme.targetKeyspace, "test", 1*time.Second) + _, err = tme.wr.MigrateWrites(ctx, tme.targetKeyspace, "test", 1*time.Second, true) want := "does not match" if err == nil || !strings.Contains(err.Error(), want) { t.Errorf("MigrateWrites err: %v, want %s", err, want) @@ -960,8 +965,8 @@ func TestStreamMigrateCancel(t *testing.T) { stopStreamsFail := func() { // sm.stopStreams->sm.readSourceStreams->readTabletStreams('Stopped') - tme.dbSourceClients[0].addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and state = 'Stopped'", &sqltypes.Result{}, nil) - tme.dbSourceClients[1].addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and state = 'Stopped'", &sqltypes.Result{}, nil) + tme.dbSourceClients[0].addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and workflow != 'test_reverse' and state = 'Stopped'", &sqltypes.Result{}, nil) + tme.dbSourceClients[1].addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and workflow != 'test_reverse' and state = 'Stopped'", &sqltypes.Result{}, nil) // pre-compute sourceRows because they're re-read multiple times. var sourceRows [][]string @@ -985,7 +990,7 @@ func TestStreamMigrateCancel(t *testing.T) { for i, dbclient := range tme.dbSourceClients { // sm.stopStreams->sm.readSourceStreams->readTabletStreams('') and VReplicationExec(_vt.copy_state) - dbclient.addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks'", sqltypes.MakeTestResult(sqltypes.MakeTestFields( + dbclient.addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and workflow != 'test_reverse'", sqltypes.MakeTestResult(sqltypes.MakeTestFields( "id|workflow|source|pos", "int64|varbinary|varchar|varbinary"), sourceRows[i]...), @@ -994,18 +999,11 @@ func TestStreamMigrateCancel(t *testing.T) { // sm.stopStreams->sm.stopSourceStreams->VReplicationExec('Stopped'): fail this dbclient.addQuery("select id from _vt.vreplication where id in (1, 2)", nil, fmt.Errorf("intentionally failed")) - - // sm.cancelMigration->sm.readSourceStreamsForCancel: this is not actually stopStream, but we're reusing the bls here. - dbclient.addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks'", sqltypes.MakeTestResult(sqltypes.MakeTestFields( - "id|workflow|source|pos", - "int64|varbinary|varchar|varbinary"), - sourceRows[i]...), - nil) } } stopStreamsFail() - smCancelMigration := func() { + cancelMigration := func() { // sm.migrateStreams->sm.deleteTargetStreams tme.dbTargetClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow in ('t1')", resultid34, nil) tme.dbTargetClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow in ('t1')", resultid34, nil) @@ -1015,20 +1013,24 @@ func TestStreamMigrateCancel(t *testing.T) { tme.dbTargetClients[1].addQuery("delete from _vt.copy_state where vrepl_id in (3, 4)", &sqltypes.Result{}, nil) // sm.migrateStreams->->restart source streams - tme.dbSourceClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow in ('t1')", resultid12, nil) - tme.dbSourceClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow in ('t1')", resultid12, nil) + tme.dbSourceClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow != 'test_reverse'", resultid12, nil) + tme.dbSourceClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow != 'test_reverse'", resultid12, nil) tme.dbSourceClients[0].addQuery("update _vt.vreplication set state = 'Running', stop_pos = null, message = '' where id in (1, 2)", &sqltypes.Result{}, nil) tme.dbSourceClients[1].addQuery("update _vt.vreplication set state = 'Running', stop_pos = null, message = '' where id in (1, 2)", &sqltypes.Result{}, nil) tme.dbSourceClients[0].addQuery("select * from _vt.vreplication where id = 1", runningResult(1), nil) tme.dbSourceClients[1].addQuery("select * from _vt.vreplication where id = 1", runningResult(1), nil) tme.dbSourceClients[0].addQuery("select * from _vt.vreplication where id = 2", runningResult(2), nil) tme.dbSourceClients[1].addQuery("select * from _vt.vreplication where id = 2", runningResult(2), nil) - } - smCancelMigration() - tme.expectCancelMigration() + // mi.cancelMigration->restart target streams + tme.dbTargetClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow = 'test'", &sqltypes.Result{}, nil) + tme.dbTargetClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow = 'test'", &sqltypes.Result{}, nil) + + tme.expectDeleteReverseVReplication() + } + cancelMigration() - _, err = tme.wr.MigrateWrites(ctx, tme.targetKeyspace, "test", 1*time.Second) + _, err = tme.wr.MigrateWrites(ctx, tme.targetKeyspace, "test", 1*time.Second, true) want := "intentionally failed" if err == nil || !strings.Contains(err.Error(), want) { t.Errorf("MigrateWrites err: %v, want %s", err, want) @@ -1087,7 +1089,7 @@ func TestStreamMigrateStoppedStreams(t *testing.T) { for i, dbclient := range tme.dbSourceClients { // sm.stopStreams->sm.readSourceStreams->readTabletStreams('') and VReplicationExec(_vt.copy_state) - dbclient.addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and state = 'Stopped'", sqltypes.MakeTestResult(sqltypes.MakeTestFields( + dbclient.addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and workflow != 'test_reverse' and state = 'Stopped'", sqltypes.MakeTestResult(sqltypes.MakeTestFields( "id|workflow|source|pos", "int64|varbinary|varchar|varbinary"), sourceRows[i]...), @@ -1096,8 +1098,9 @@ func TestStreamMigrateStoppedStreams(t *testing.T) { } } stopStreams() + tme.expectCancelMigration() - _, err = tme.wr.MigrateWrites(ctx, tme.targetKeyspace, "test", 1*time.Second) + _, err = tme.wr.MigrateWrites(ctx, tme.targetKeyspace, "test", 1*time.Second, true) want := "cannot migrate until all strems are running: 0" if err == nil || err.Error() != want { t.Errorf("MigrateWrites err: %v, want %v", err, want) @@ -1123,7 +1126,7 @@ func TestStreamMigrateStillCopying(t *testing.T) { stopStreams := func() { // sm.stopStreams->sm.readSourceStreams->readTabletStreams('Stopped') - tme.dbSourceClients[0].addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and state = 'Stopped'", &sqltypes.Result{}, nil) + tme.dbSourceClients[0].addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and workflow != 'test_reverse' and state = 'Stopped'", &sqltypes.Result{}, nil) // pre-compute sourceRows because they're re-read multiple times. var sourceRows [][]string @@ -1147,7 +1150,7 @@ func TestStreamMigrateStillCopying(t *testing.T) { for i, dbclient := range tme.dbSourceClients { // sm.stopStreams->sm.readSourceStreams->readTabletStreams('') and VReplicationExec(_vt.copy_state) - dbclient.addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks'", sqltypes.MakeTestResult(sqltypes.MakeTestFields( + dbclient.addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and workflow != 'test_reverse'", sqltypes.MakeTestResult(sqltypes.MakeTestFields( "id|workflow|source|pos", "int64|varbinary|varchar|varbinary"), sourceRows[i]...), @@ -1156,8 +1159,9 @@ func TestStreamMigrateStillCopying(t *testing.T) { } } stopStreams() + tme.expectCancelMigration() - _, err = tme.wr.MigrateWrites(ctx, tme.targetKeyspace, "test", 1*time.Second) + _, err = tme.wr.MigrateWrites(ctx, tme.targetKeyspace, "test", 1*time.Second, true) want := "cannot migrate while vreplication streams in source shards are still copying: 0" if err == nil || err.Error() != want { t.Errorf("MigrateWrites err: %v, want %v", err, want) @@ -1183,7 +1187,7 @@ func TestStreamMigrateEmptyWorflow(t *testing.T) { stopStreams := func() { // sm.stopStreams->sm.readSourceStreams->readTabletStreams('Stopped') - tme.dbSourceClients[0].addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and state = 'Stopped'", &sqltypes.Result{}, nil) + tme.dbSourceClients[0].addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and workflow != 'test_reverse' and state = 'Stopped'", &sqltypes.Result{}, nil) // pre-compute sourceRows because they're re-read multiple times. var sourceRows [][]string @@ -1207,7 +1211,7 @@ func TestStreamMigrateEmptyWorflow(t *testing.T) { for i, dbclient := range tme.dbSourceClients { // sm.stopStreams->sm.readSourceStreams->readTabletStreams('') and VReplicationExec(_vt.copy_state) - dbclient.addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks'", sqltypes.MakeTestResult(sqltypes.MakeTestFields( + dbclient.addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and workflow != 'test_reverse'", sqltypes.MakeTestResult(sqltypes.MakeTestFields( "id|workflow|source|pos", "int64|varbinary|varchar|varbinary"), sourceRows[i]...), @@ -1215,8 +1219,9 @@ func TestStreamMigrateEmptyWorflow(t *testing.T) { } } stopStreams() + tme.expectCancelMigration() - _, err = tme.wr.MigrateWrites(ctx, tme.targetKeyspace, "test", 1*time.Second) + _, err = tme.wr.MigrateWrites(ctx, tme.targetKeyspace, "test", 1*time.Second, true) want := "VReplication streams must have named workflows for migration: shard: ks:0, stream: 1" if err == nil || err.Error() != want { t.Errorf("MigrateWrites err: %v, want %v", err, want) @@ -1242,7 +1247,7 @@ func TestStreamMigrateDupWorflow(t *testing.T) { stopStreams := func() { // sm.stopStreams->sm.readSourceStreams->readTabletStreams('Stopped') - tme.dbSourceClients[0].addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and state = 'Stopped'", &sqltypes.Result{}, nil) + tme.dbSourceClients[0].addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and workflow != 'test_reverse' and state = 'Stopped'", &sqltypes.Result{}, nil) // pre-compute sourceRows because they're re-read multiple times. var sourceRows [][]string @@ -1266,7 +1271,7 @@ func TestStreamMigrateDupWorflow(t *testing.T) { for i, dbclient := range tme.dbSourceClients { // sm.stopStreams->sm.readSourceStreams->readTabletStreams('') and VReplicationExec(_vt.copy_state) - dbclient.addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks'", sqltypes.MakeTestResult(sqltypes.MakeTestFields( + dbclient.addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and workflow != 'test_reverse'", sqltypes.MakeTestResult(sqltypes.MakeTestFields( "id|workflow|source|pos", "int64|varbinary|varchar|varbinary"), sourceRows[i]...), @@ -1274,8 +1279,9 @@ func TestStreamMigrateDupWorflow(t *testing.T) { } } stopStreams() + tme.expectCancelMigration() - _, err = tme.wr.MigrateWrites(ctx, tme.targetKeyspace, "test", 1*time.Second) + _, err = tme.wr.MigrateWrites(ctx, tme.targetKeyspace, "test", 1*time.Second, true) want := "VReplication stream has the same workflow name as the resharding workflow: shard: ks:0, stream: 1" if err == nil || err.Error() != want { t.Errorf("MigrateWrites err: %v, want %v", err, want) @@ -1302,8 +1308,8 @@ func TestStreamMigrateStreamsMismatch(t *testing.T) { stopStreams := func() { // sm.stopStreams->sm.readSourceStreams->readTabletStreams('Stopped') - tme.dbSourceClients[0].addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and state = 'Stopped'", &sqltypes.Result{}, nil) - tme.dbSourceClients[1].addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and state = 'Stopped'", &sqltypes.Result{}, nil) + tme.dbSourceClients[0].addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and workflow != 'test_reverse' and state = 'Stopped'", &sqltypes.Result{}, nil) + tme.dbSourceClients[1].addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and workflow != 'test_reverse' and state = 'Stopped'", &sqltypes.Result{}, nil) // pre-compute sourceRows because they're re-read multiple times. var sourceRows [][]string @@ -1331,7 +1337,7 @@ func TestStreamMigrateStreamsMismatch(t *testing.T) { for i, dbclient := range tme.dbSourceClients { // sm.stopStreams->sm.readSourceStreams->readTabletStreams('') and VReplicationExec(_vt.copy_state) - dbclient.addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks'", sqltypes.MakeTestResult(sqltypes.MakeTestFields( + dbclient.addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and workflow != 'test_reverse'", sqltypes.MakeTestResult(sqltypes.MakeTestFields( "id|workflow|source|pos", "int64|varbinary|varchar|varbinary"), sourceRows[i]...), @@ -1344,8 +1350,9 @@ func TestStreamMigrateStreamsMismatch(t *testing.T) { } } stopStreams() + tme.expectCancelMigration() - _, err = tme.wr.MigrateWrites(ctx, tme.targetKeyspace, "test", 1*time.Second) + _, err = tme.wr.MigrateWrites(ctx, tme.targetKeyspace, "test", 1*time.Second, true) want := "streams are mismatched across source shards" if err == nil || !strings.Contains(err.Error(), want) { t.Errorf("MigrateWrites err: %v, must contain %v", err, want)