Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ func expectNontxQueries(t *testing.T, queries []string) {
retry:
select {
case got = <-globalDBQueries:
if got == "begin" || got == "commit" || strings.Contains(got, "_vt.vreplication") {
if got == "begin" || got == "commit" || got == "rollback" || strings.Contains(got, "_vt.vreplication") {
goto retry
}
var match bool
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,43 +190,28 @@ func TestPlayerCopyBigTable(t *testing.T) {
expectDeleteQueries(t)
}()

expectDBClientQueries(t, []string{
"/insert into _vt.vreplication",
expectNontxQueries(t, []string{
// Create the list of tables to copy and transition to Copying state.
"begin",
"/insert into _vt.copy_state",
"/update _vt.vreplication set state='Copying'",
"commit",
// The first fast-forward has no starting point. So, it just saves the current position.
"/update _vt.vreplication set pos=",
"begin",
"insert into dst(id,val) values (1,'aaa')",
`/update _vt.copy_state set lastpk='fields:<name:\\"id\\" type:INT32 > rows:<lengths:1 values:\\"1\\" > ' where vrepl_id=.*`,
"commit",
// The next catchup executes the new row insert, but will be a no-op.
"begin",
"insert into dst(id,val) select 3, 'ccc' from dual where (3) <= (1)",
"/update _vt.vreplication set pos=",
"commit",
// fastForward has nothing to add. Just saves position.
"begin",
"/update _vt.vreplication set pos=",
"commit",
// Second row gets copied.
"begin",
"insert into dst(id,val) values (2,'bbb')",
`/update _vt.copy_state set lastpk='fields:<name:\\"id\\" type:INT32 > rows:<lengths:1 values:\\"2\\" > ' where vrepl_id=.*`,
"commit",
// Third row copied without going back to catchup state.
"begin",
"insert into dst(id,val) values (3,'ccc')",
`/update _vt.copy_state set lastpk='fields:<name:\\"id\\" type:INT32 > rows:<lengths:1 values:\\"3\\" > ' where vrepl_id=.*`,
"commit",
"/delete from _vt.copy_state.*dst",
// Copy is done. Go into running state.
"/update _vt.vreplication set state='Running'",
// All tables copied. Final catch up followed by Running state.
})
expectDBClientQueries(t, []string{
"/update _vt.vreplication set state='Running'",
})
expectData(t, "dst", [][]string{
{"1", "aaa"},
{"2", "bbb"},
Expand Down Expand Up @@ -317,43 +302,28 @@ func TestPlayerCopyWildcardRule(t *testing.T) {
expectDeleteQueries(t)
}()

expectDBClientQueries(t, []string{
"/insert into _vt.vreplication",
expectNontxQueries(t, []string{
// Create the list of tables to copy and transition to Copying state.
"begin",
"/insert into _vt.copy_state",
"/update _vt.vreplication set state='Copying'",
"commit",
// The first fast-forward has no starting point. So, it just saves the current position.
"/update _vt.vreplication set pos=",
"begin",
"insert into src(id,val) values (1,'aaa')",
`/update _vt.copy_state set lastpk='fields:<name:\\"id\\" type:INT32 > rows:<lengths:1 values:\\"1\\" > ' where vrepl_id=.*`,
"commit",
// The next catchup executes the new row insert, but will be a no-op.
"begin",
"insert into src(id,val) select 3, 'ccc' from dual where (3) <= (1)",
"/update _vt.vreplication set pos=",
"commit",
// fastForward has nothing to add. Just saves position.
"begin",
"/update _vt.vreplication set pos=",
"commit",
// Second row gets copied.
"begin",
"insert into src(id,val) values (2,'bbb')",
`/update _vt.copy_state set lastpk='fields:<name:\\"id\\" type:INT32 > rows:<lengths:1 values:\\"2\\" > ' where vrepl_id=.*`,
"commit",
// Third row copied without going back to catchup state.
"begin",
"insert into src(id,val) values (3,'ccc')",
`/update _vt.copy_state set lastpk='fields:<name:\\"id\\" type:INT32 > rows:<lengths:1 values:\\"3\\" > ' where vrepl_id=.*`,
"commit",
"/delete from _vt.copy_state.*src",
// Copy is done. Go into running state.
"/update _vt.vreplication set state='Running'",
// All tables copied. Final catch up followed by Running state.
})
expectDBClientQueries(t, []string{
"/update _vt.vreplication set state='Running'",
})
expectData(t, "src", [][]string{
{"1", "aaa"},
{"2", "bbb"},
Expand Down
7 changes: 6 additions & 1 deletion go/vt/vttablet/tabletmanager/vreplication/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error {
// TODO(sougou): if we also stored the time of the last event, we
// can estimate this value more accurately.
defer vp.vr.stats.SecondsBehindMaster.Set(math.MaxInt64)
var sbm int64 = -1
for {
items, err := relay.Fetch()
if err != nil {
Expand Down Expand Up @@ -322,7 +323,7 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error {
if event.Timestamp != 0 {
vp.lastTimestampNs = event.Timestamp * 1e9
vp.timeOffsetNs = time.Now().UnixNano() - event.CurrentTime
vp.vr.stats.SecondsBehindMaster.Set(event.CurrentTime/1e9 - event.Timestamp)
sbm = event.CurrentTime/1e9 - event.Timestamp
}
mustSave := false
switch event.Type {
Expand All @@ -348,6 +349,10 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error {
}
}
}
if sbm >= 0 {
vp.vr.stats.SecondsBehindMaster.Set(sbm)
}

}
}

Expand Down
5 changes: 3 additions & 2 deletions go/vt/wrangler/stream_migrater.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ package wrangler

import (
"fmt"
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
"sort"
"strings"
"sync"
"text/template"

"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
Expand Down
4 changes: 4 additions & 0 deletions go/vt/wrangler/traffic_switcher_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,8 @@ func (tme *testMigraterEnv) createDBClients(ctx context.Context, t *testing.T) {
dbclient := newFakeDBClient()
tme.dbSourceClients = append(tme.dbSourceClients, dbclient)
dbClientFactory := func() binlogplayer.DBClient { return dbclient }
// Replace existing engine with a new one
master.Agent.VREngine.Close()
master.Agent.VREngine = vreplication.NewEngine(tme.ts, "", master.FakeMysqlDaemon, dbClientFactory, dbclient.DBName())
if err := master.Agent.VREngine.Open(ctx); err != nil {
t.Fatal(err)
Expand All @@ -340,6 +342,8 @@ func (tme *testMigraterEnv) createDBClients(ctx context.Context, t *testing.T) {
dbclient := newFakeDBClient()
tme.dbTargetClients = append(tme.dbTargetClients, dbclient)
dbClientFactory := func() binlogplayer.DBClient { return dbclient }
// Replace existing engine with a new one
master.Agent.VREngine.Close()
master.Agent.VREngine = vreplication.NewEngine(tme.ts, "", master.FakeMysqlDaemon, dbClientFactory, dbclient.DBName())
if err := master.Agent.VREngine.Open(ctx); err != nil {
t.Fatal(err)
Expand Down