diff --git a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go index 72b9e2c3b82..a87f6fe0dfa 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go @@ -497,7 +497,7 @@ func expectNontxQueries(t *testing.T, queries []string) { retry: select { case got = <-globalDBQueries: - if got == "begin" || got == "commit" || strings.Contains(got, "_vt.vreplication") { + if got == "begin" || got == "commit" || got == "rollback" || strings.Contains(got, "_vt.vreplication") { goto retry } var match bool diff --git a/go/vt/vttablet/tabletmanager/vreplication/vcopier_flaky_test.go b/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go similarity index 97% rename from go/vt/vttablet/tabletmanager/vreplication/vcopier_flaky_test.go rename to go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go index 9ee4648750b..b698c93d863 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vcopier_flaky_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go @@ -190,43 +190,28 @@ func TestPlayerCopyBigTable(t *testing.T) { expectDeleteQueries(t) }() - expectDBClientQueries(t, []string{ - "/insert into _vt.vreplication", + expectNontxQueries(t, []string{ // Create the list of tables to copy and transition to Copying state. - "begin", "/insert into _vt.copy_state", - "/update _vt.vreplication set state='Copying'", - "commit", // The first fast-forward has no starting point. So, it just saves the current position. - "/update _vt.vreplication set pos=", - "begin", "insert into dst(id,val) values (1,'aaa')", `/update _vt.copy_state set lastpk='fields: rows: ' where vrepl_id=.*`, - "commit", // The next catchup executes the new row insert, but will be a no-op. - "begin", "insert into dst(id,val) select 3, 'ccc' from dual where (3) <= (1)", - "/update _vt.vreplication set pos=", - "commit", // fastForward has nothing to add. Just saves position. - "begin", - "/update _vt.vreplication set pos=", - "commit", // Second row gets copied. - "begin", "insert into dst(id,val) values (2,'bbb')", `/update _vt.copy_state set lastpk='fields: rows: ' where vrepl_id=.*`, - "commit", // Third row copied without going back to catchup state. - "begin", "insert into dst(id,val) values (3,'ccc')", `/update _vt.copy_state set lastpk='fields: rows: ' where vrepl_id=.*`, - "commit", "/delete from _vt.copy_state.*dst", // Copy is done. Go into running state. - "/update _vt.vreplication set state='Running'", // All tables copied. Final catch up followed by Running state. }) + expectDBClientQueries(t, []string{ + "/update _vt.vreplication set state='Running'", + }) expectData(t, "dst", [][]string{ {"1", "aaa"}, {"2", "bbb"}, @@ -317,43 +302,28 @@ func TestPlayerCopyWildcardRule(t *testing.T) { expectDeleteQueries(t) }() - expectDBClientQueries(t, []string{ - "/insert into _vt.vreplication", + expectNontxQueries(t, []string{ // Create the list of tables to copy and transition to Copying state. - "begin", "/insert into _vt.copy_state", - "/update _vt.vreplication set state='Copying'", - "commit", // The first fast-forward has no starting point. So, it just saves the current position. - "/update _vt.vreplication set pos=", - "begin", "insert into src(id,val) values (1,'aaa')", `/update _vt.copy_state set lastpk='fields: rows: ' where vrepl_id=.*`, - "commit", // The next catchup executes the new row insert, but will be a no-op. - "begin", "insert into src(id,val) select 3, 'ccc' from dual where (3) <= (1)", - "/update _vt.vreplication set pos=", - "commit", // fastForward has nothing to add. Just saves position. - "begin", - "/update _vt.vreplication set pos=", - "commit", // Second row gets copied. - "begin", "insert into src(id,val) values (2,'bbb')", `/update _vt.copy_state set lastpk='fields: rows: ' where vrepl_id=.*`, - "commit", // Third row copied without going back to catchup state. - "begin", "insert into src(id,val) values (3,'ccc')", `/update _vt.copy_state set lastpk='fields: rows: ' where vrepl_id=.*`, - "commit", "/delete from _vt.copy_state.*src", // Copy is done. Go into running state. - "/update _vt.vreplication set state='Running'", // All tables copied. Final catch up followed by Running state. }) + expectDBClientQueries(t, []string{ + "/update _vt.vreplication set state='Running'", + }) expectData(t, "src", [][]string{ {"1", "aaa"}, {"2", "bbb"}, diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index e441e127286..7b14d33a3bc 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -290,6 +290,7 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error { // TODO(sougou): if we also stored the time of the last event, we // can estimate this value more accurately. defer vp.vr.stats.SecondsBehindMaster.Set(math.MaxInt64) + var sbm int64 = -1 for { items, err := relay.Fetch() if err != nil { @@ -322,7 +323,7 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error { if event.Timestamp != 0 { vp.lastTimestampNs = event.Timestamp * 1e9 vp.timeOffsetNs = time.Now().UnixNano() - event.CurrentTime - vp.vr.stats.SecondsBehindMaster.Set(event.CurrentTime/1e9 - event.Timestamp) + sbm = event.CurrentTime/1e9 - event.Timestamp } mustSave := false switch event.Type { @@ -348,6 +349,10 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error { } } } + if sbm >= 0 { + vp.vr.stats.SecondsBehindMaster.Set(sbm) + } + } } diff --git a/go/vt/wrangler/stream_migrater.go b/go/vt/wrangler/stream_migrater.go index e5edad90878..963f6e711cb 100644 --- a/go/vt/wrangler/stream_migrater.go +++ b/go/vt/wrangler/stream_migrater.go @@ -18,12 +18,13 @@ package wrangler import ( "fmt" - "github.com/golang/protobuf/proto" - "golang.org/x/net/context" "sort" "strings" "sync" "text/template" + + "github.com/golang/protobuf/proto" + "golang.org/x/net/context" "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/binlog/binlogplayer" diff --git a/go/vt/wrangler/traffic_switcher_env_test.go b/go/vt/wrangler/traffic_switcher_env_test.go index e90f6d8508b..9e4dcd63ad9 100644 --- a/go/vt/wrangler/traffic_switcher_env_test.go +++ b/go/vt/wrangler/traffic_switcher_env_test.go @@ -331,6 +331,8 @@ func (tme *testMigraterEnv) createDBClients(ctx context.Context, t *testing.T) { dbclient := newFakeDBClient() tme.dbSourceClients = append(tme.dbSourceClients, dbclient) dbClientFactory := func() binlogplayer.DBClient { return dbclient } + // Replace existing engine with a new one + master.Agent.VREngine.Close() master.Agent.VREngine = vreplication.NewEngine(tme.ts, "", master.FakeMysqlDaemon, dbClientFactory, dbclient.DBName()) if err := master.Agent.VREngine.Open(ctx); err != nil { t.Fatal(err) @@ -340,6 +342,8 @@ func (tme *testMigraterEnv) createDBClients(ctx context.Context, t *testing.T) { dbclient := newFakeDBClient() tme.dbTargetClients = append(tme.dbTargetClients, dbclient) dbClientFactory := func() binlogplayer.DBClient { return dbclient } + // Replace existing engine with a new one + master.Agent.VREngine.Close() master.Agent.VREngine = vreplication.NewEngine(tme.ts, "", master.FakeMysqlDaemon, dbClientFactory, dbclient.DBName()) if err := master.Agent.VREngine.Open(ctx); err != nil { t.Fatal(err)