From 1ea4f719cc82f1655a6bc21f2ea8a3e1dd8be9f8 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Mon, 30 Mar 2020 22:34:23 +0200 Subject: [PATCH 1/7] Change flaky tests to expect nonTx queries: WIP Signed-off-by: Rohit Nayak --- .../vreplication/vcopier_flaky_test.go | 42 +------------------ .../tabletmanager/vreplication/vplayer.go | 8 +++- 2 files changed, 9 insertions(+), 41 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vcopier_flaky_test.go b/go/vt/vttablet/tabletmanager/vreplication/vcopier_flaky_test.go index 9ee4648750b..80bafd5518f 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vcopier_flaky_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vcopier_flaky_test.go @@ -187,44 +187,25 @@ func TestPlayerCopyBigTable(t *testing.T) { if _, err := playerEngine.Exec(query); err != nil { t.Fatal(err) } - expectDeleteQueries(t) }() - expectDBClientQueries(t, []string{ - "/insert into _vt.vreplication", + expectNontxQueries(t, []string{ // Create the list of tables to copy and transition to Copying state. - "begin", "/insert into _vt.copy_state", - "/update _vt.vreplication set state='Copying'", - "commit", // The first fast-forward has no starting point. So, it just saves the current position. - "/update _vt.vreplication set pos=", - "begin", "insert into dst(id,val) values (1,'aaa')", `/update _vt.copy_state set lastpk='fields: rows: ' where vrepl_id=.*`, - "commit", // The next catchup executes the new row insert, but will be a no-op. - "begin", "insert into dst(id,val) select 3, 'ccc' from dual where (3) <= (1)", - "/update _vt.vreplication set pos=", - "commit", // fastForward has nothing to add. Just saves position. - "begin", - "/update _vt.vreplication set pos=", - "commit", // Second row gets copied. - "begin", "insert into dst(id,val) values (2,'bbb')", `/update _vt.copy_state set lastpk='fields: rows: ' where vrepl_id=.*`, - "commit", // Third row copied without going back to catchup state. - "begin", "insert into dst(id,val) values (3,'ccc')", `/update _vt.copy_state set lastpk='fields: rows: ' where vrepl_id=.*`, - "commit", "/delete from _vt.copy_state.*dst", // Copy is done. Go into running state. - "/update _vt.vreplication set state='Running'", // All tables copied. Final catch up followed by Running state. }) expectData(t, "dst", [][]string{ @@ -314,44 +295,25 @@ func TestPlayerCopyWildcardRule(t *testing.T) { if _, err := playerEngine.Exec(query); err != nil { t.Fatal(err) } - expectDeleteQueries(t) }() - expectDBClientQueries(t, []string{ - "/insert into _vt.vreplication", + expectNontxQueries(t, []string{ // Create the list of tables to copy and transition to Copying state. - "begin", "/insert into _vt.copy_state", - "/update _vt.vreplication set state='Copying'", - "commit", // The first fast-forward has no starting point. So, it just saves the current position. - "/update _vt.vreplication set pos=", - "begin", "insert into src(id,val) values (1,'aaa')", `/update _vt.copy_state set lastpk='fields: rows: ' where vrepl_id=.*`, - "commit", // The next catchup executes the new row insert, but will be a no-op. - "begin", "insert into src(id,val) select 3, 'ccc' from dual where (3) <= (1)", - "/update _vt.vreplication set pos=", - "commit", // fastForward has nothing to add. Just saves position. - "begin", - "/update _vt.vreplication set pos=", - "commit", // Second row gets copied. - "begin", "insert into src(id,val) values (2,'bbb')", `/update _vt.copy_state set lastpk='fields: rows: ' where vrepl_id=.*`, - "commit", // Third row copied without going back to catchup state. - "begin", "insert into src(id,val) values (3,'ccc')", `/update _vt.copy_state set lastpk='fields: rows: ' where vrepl_id=.*`, - "commit", "/delete from _vt.copy_state.*src", // Copy is done. Go into running state. - "/update _vt.vreplication set state='Running'", // All tables copied. Final catch up followed by Running state. }) expectData(t, "src", [][]string{ diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index e441e127286..c0336223428 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -290,6 +290,7 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error { // TODO(sougou): if we also stored the time of the last event, we // can estimate this value more accurately. defer vp.vr.stats.SecondsBehindMaster.Set(math.MaxInt64) + var sbm int64 = -1 for { items, err := relay.Fetch() if err != nil { @@ -322,7 +323,8 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error { if event.Timestamp != 0 { vp.lastTimestampNs = event.Timestamp * 1e9 vp.timeOffsetNs = time.Now().UnixNano() - event.CurrentTime - vp.vr.stats.SecondsBehindMaster.Set(event.CurrentTime/1e9 - event.Timestamp) + //vp.vr.stats.SecondsBehindMaster.Set(event.CurrentTime/1e9 - event.Timestamp) + sbm = event.CurrentTime/1e9 - event.Timestamp } mustSave := false switch event.Type { @@ -348,6 +350,10 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error { } } } + if sbm >= 0 { + vp.vr.stats.SecondsBehindMaster.Set(sbm) + } + } } From 5a5fe60f92d2fd06de84dd7e1653bfe79a4539cd Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Tue, 31 Mar 2020 22:45:47 +0200 Subject: [PATCH 2/7] Fixed flakiness Signed-off-by: Rohit Nayak --- .../{vcopier_flaky_test.go => vcopier_test.go} | 8 ++++++++ 1 file changed, 8 insertions(+) rename go/vt/vttablet/tabletmanager/vreplication/{vcopier_flaky_test.go => vcopier_test.go} (99%) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vcopier_flaky_test.go b/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go similarity index 99% rename from go/vt/vttablet/tabletmanager/vreplication/vcopier_flaky_test.go rename to go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go index 80bafd5518f..7ca8a7bdb41 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vcopier_flaky_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go @@ -187,6 +187,7 @@ func TestPlayerCopyBigTable(t *testing.T) { if _, err := playerEngine.Exec(query); err != nil { t.Fatal(err) } + expectDeleteQueries(t) }() expectNontxQueries(t, []string{ @@ -208,6 +209,9 @@ func TestPlayerCopyBigTable(t *testing.T) { // Copy is done. Go into running state. // All tables copied. Final catch up followed by Running state. }) + expectDBClientQueries(t,[]string{ + "/update _vt.vreplication set state='Running'", + } ) expectData(t, "dst", [][]string{ {"1", "aaa"}, {"2", "bbb"}, @@ -295,6 +299,7 @@ func TestPlayerCopyWildcardRule(t *testing.T) { if _, err := playerEngine.Exec(query); err != nil { t.Fatal(err) } + expectDeleteQueries(t) }() expectNontxQueries(t, []string{ @@ -316,6 +321,9 @@ func TestPlayerCopyWildcardRule(t *testing.T) { // Copy is done. Go into running state. // All tables copied. Final catch up followed by Running state. }) + expectDBClientQueries(t,[]string{ + "/update _vt.vreplication set state='Running'", + } ) expectData(t, "src", [][]string{ {"1", "aaa"}, {"2", "bbb"}, From ca4ecf7ad6ee3c510dd49a9d2d9e13992eaa8e2e Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Wed, 1 Apr 2020 13:23:07 +0200 Subject: [PATCH 3/7] Removed extraneous comment. Fixed gofmt Signed-off-by: Rohit Nayak --- go/vt/vttablet/tabletmanager/vreplication/vplayer.go | 1 - go/vt/wrangler/stream_migrater.go | 5 +++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index c0336223428..7b14d33a3bc 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -323,7 +323,6 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error { if event.Timestamp != 0 { vp.lastTimestampNs = event.Timestamp * 1e9 vp.timeOffsetNs = time.Now().UnixNano() - event.CurrentTime - //vp.vr.stats.SecondsBehindMaster.Set(event.CurrentTime/1e9 - event.Timestamp) sbm = event.CurrentTime/1e9 - event.Timestamp } mustSave := false diff --git a/go/vt/wrangler/stream_migrater.go b/go/vt/wrangler/stream_migrater.go index e5edad90878..963f6e711cb 100644 --- a/go/vt/wrangler/stream_migrater.go +++ b/go/vt/wrangler/stream_migrater.go @@ -18,12 +18,13 @@ package wrangler import ( "fmt" - "github.com/golang/protobuf/proto" - "golang.org/x/net/context" "sort" "strings" "sync" "text/template" + + "github.com/golang/protobuf/proto" + "golang.org/x/net/context" "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/binlog/binlogplayer" From 409a29ee49ebeb1f7ef6c01a249dc37bf4f74a82 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Wed, 1 Apr 2020 14:15:52 +0200 Subject: [PATCH 4/7] fixed styles Signed-off-by: Rohit Nayak --- go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go b/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go index 7ca8a7bdb41..b698c93d863 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go @@ -209,9 +209,9 @@ func TestPlayerCopyBigTable(t *testing.T) { // Copy is done. Go into running state. // All tables copied. Final catch up followed by Running state. }) - expectDBClientQueries(t,[]string{ + expectDBClientQueries(t, []string{ "/update _vt.vreplication set state='Running'", - } ) + }) expectData(t, "dst", [][]string{ {"1", "aaa"}, {"2", "bbb"}, @@ -321,9 +321,9 @@ func TestPlayerCopyWildcardRule(t *testing.T) { // Copy is done. Go into running state. // All tables copied. Final catch up followed by Running state. }) - expectDBClientQueries(t,[]string{ + expectDBClientQueries(t, []string{ "/update _vt.vreplication set state='Running'", - } ) + }) expectData(t, "src", [][]string{ {"1", "aaa"}, {"2", "bbb"}, From 8f3205b8d7545203991635ebc614a869148a62ca Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Thu, 2 Apr 2020 21:07:56 +0200 Subject: [PATCH 5/7] Close unused VREngine and hence related healthcheck streams which were causing races since they continued to run Signed-off-by: Rohit Nayak --- go/vt/wrangler/traffic_switcher_env_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/go/vt/wrangler/traffic_switcher_env_test.go b/go/vt/wrangler/traffic_switcher_env_test.go index e90f6d8508b..9e4dcd63ad9 100644 --- a/go/vt/wrangler/traffic_switcher_env_test.go +++ b/go/vt/wrangler/traffic_switcher_env_test.go @@ -331,6 +331,8 @@ func (tme *testMigraterEnv) createDBClients(ctx context.Context, t *testing.T) { dbclient := newFakeDBClient() tme.dbSourceClients = append(tme.dbSourceClients, dbclient) dbClientFactory := func() binlogplayer.DBClient { return dbclient } + // Replace existing engine with a new one + master.Agent.VREngine.Close() master.Agent.VREngine = vreplication.NewEngine(tme.ts, "", master.FakeMysqlDaemon, dbClientFactory, dbclient.DBName()) if err := master.Agent.VREngine.Open(ctx); err != nil { t.Fatal(err) @@ -340,6 +342,8 @@ func (tme *testMigraterEnv) createDBClients(ctx context.Context, t *testing.T) { dbclient := newFakeDBClient() tme.dbTargetClients = append(tme.dbTargetClients, dbclient) dbClientFactory := func() binlogplayer.DBClient { return dbclient } + // Replace existing engine with a new one + master.Agent.VREngine.Close() master.Agent.VREngine = vreplication.NewEngine(tme.ts, "", master.FakeMysqlDaemon, dbClientFactory, dbclient.DBName()) if err := master.Agent.VREngine.Open(ctx); err != nil { t.Fatal(err) From ca2954cdbd8164e4d0c972a9f4e63cfd3b2e1715 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Thu, 2 Apr 2020 21:14:58 +0200 Subject: [PATCH 6/7] Add rollback to ignored transactions. Rollbacks can occur if a context times out Signed-off-by: Rohit Nayak --- go/vt/vttablet/tabletmanager/vreplication/framework_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go index 72b9e2c3b82..44d7ebe314c 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go @@ -497,7 +497,7 @@ func expectNontxQueries(t *testing.T, queries []string) { retry: select { case got = <-globalDBQueries: - if got == "begin" || got == "commit" || strings.Contains(got, "_vt.vreplication") { + if got == "begin" || got == "commit" || got == "rollback" ||strings.Contains(got, "_vt.vreplication") { goto retry } var match bool From 3ad80df32cb1580bff2c1364eee9eb9c5b71f91a Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Thu, 2 Apr 2020 22:32:06 +0200 Subject: [PATCH 7/7] gofmted Signed-off-by: Rohit Nayak --- go/vt/vttablet/tabletmanager/vreplication/framework_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go index 44d7ebe314c..a87f6fe0dfa 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go @@ -497,7 +497,7 @@ func expectNontxQueries(t *testing.T, queries []string) { retry: select { case got = <-globalDBQueries: - if got == "begin" || got == "commit" || got == "rollback" ||strings.Contains(got, "_vt.vreplication") { + if got == "begin" || got == "commit" || got == "rollback" || strings.Contains(got, "_vt.vreplication") { goto retry } var match bool