Skip to content
This repository was archived by the owner on Dec 16, 2022. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go/cmd/vttablet/vttablet.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ func main() {
log.Exitf("NewActionAgent() failed: %v", err)
}

tabletserver.InitAPI(agent.VREngine)

servenv.OnClose(func() {
// Close the agent so that our topo entry gets pruned properly and any
// background goroutines that use the topo connection are stopped.
Expand Down
1 change: 0 additions & 1 deletion go/mysql/flavor_filepos.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ func (flv *filePosFlavor) masterGTIDSet(c *Conn) (GTIDSet, error) {
if len(qr.Rows) == 0 {
return nil, errors.New("no master status")
}

resultMap, err := resultToMap(qr)
if err != nil {
return nil, err
Expand Down
29 changes: 18 additions & 11 deletions go/vt/vttablet/tabletmanager/action_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,9 +322,10 @@ func NewActionAgent(

vreplication.InitVStreamerClient(agent.DBConfigs)

// The db name is set by the Start function called above
filteredWithDBParams, _ := agent.DBConfigs.FilteredWithDB().MysqlParams()
agent.VREngine = vreplication.NewEngine(ts, tabletAlias.Cell, mysqld, func() binlogplayer.DBClient {
// TODO(@setassociative, merge resolution): added filteredWithDBParams; previously set by Start
// agent.VREngine = vreplication.NewEngine(ts, tabletAlias.Cell, mysqld, func() binlogplayer.DBClient {
agent.VREngine = vreplication.NewEngine(ts, tabletAlias.GetCell(), agent.Tablet(), mysqld, func() binlogplayer.DBClient {
return binlogplayer.NewDBClient(agent.DBConfigs.FilteredWithDB())
},
filteredWithDBParams.DbName,
Expand Down Expand Up @@ -418,10 +419,13 @@ func NewTestActionAgent(batchCtx context.Context, ts *topo.Server, tabletAlias *
TabletAlias: tabletAlias,
Cnf: nil,
MysqlDaemon: mysqlDaemon,
VREngine: vreplication.NewEngine(ts, tabletAlias.Cell, mysqlDaemon, binlogplayer.NewFakeDBClient, ti.DbName()),
History: history.New(historyLength),
DemoteMasterType: demoteMasterTabletType,
_healthy: fmt.Errorf("healthcheck not run yet"),
DBConfigs: &dbconfigs.DBConfigs{},
// TODO(@setassociative, merge resolution):
// vreplication.NewEngine(ts, tabletAlias.Cell, mysqlDaemon, binlogplayer.NewFakeDBClient, ti.DbName()),
VREngine: vreplication.NewEngine(ts, tabletAlias.GetCell(), nil, mysqlDaemon, binlogplayer.NewFakeDBClient, ti.DbName()),
History: history.New(historyLength),
DemoteMasterType: demoteMasterTabletType,
_healthy: fmt.Errorf("healthcheck not run yet"),
}
if preStart != nil {
preStart(agent)
Expand Down Expand Up @@ -462,11 +466,14 @@ func NewComboActionAgent(batchCtx context.Context, ts *topo.Server, tabletAlias
TabletAlias: tabletAlias,
Cnf: nil,
MysqlDaemon: mysqlDaemon,
VREngine: vreplication.NewEngine(nil, "", nil, nil, ""),
gotMysqlPort: true,
History: history.New(historyLength),
DemoteMasterType: demoteMasterType,
_healthy: fmt.Errorf("healthcheck not run yet"),
DBConfigs: dbcfgs,
// TODO(@setassociative, merge resolution):
// VREngine: vreplication.NewEngine(nil, "", nil, nil, ""),
VREngine: vreplication.NewEngine(nil, "", nil, nil, nil, ""),
gotMysqlPort: true,
History: history.New(historyLength),
DemoteMasterType: demoteMasterType,
_healthy: fmt.Errorf("healthcheck not run yet"),
}
agent.registerQueryRuleSources()

Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/init_tablet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func TestInitTablet(t *testing.T) {
TabletAlias: tabletAlias,
MysqlDaemon: mysqlDaemon,
DBConfigs: &dbconfigs.DBConfigs{},
VREngine: vreplication.NewEngine(nil, "", nil, nil, ""),
VREngine: vreplication.NewEngine(nil, "", nil, nil, nil, ""),
batchCtx: ctx,
History: history.New(historyLength),
_healthy: fmt.Errorf("healthcheck not run yet"),
Expand Down
49 changes: 48 additions & 1 deletion go/vt/vttablet/tabletmanager/vreplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,53 @@ func (ct *controller) run(ctx context.Context) {
}
}

func (ct *controller) runVDiff(ctx context.Context) (err error) {
defer func() {
ct.sourceTablet.Set("")
if x := recover(); x != nil {
log.Errorf("stream %v: caught panic: %v\n%s", ct.id, x, tb.Stack(4))
err = fmt.Errorf("panic: %v", x)
}
}()

select {
case <-ctx.Done():
return nil
default:
}

dbClient := ct.dbClientFactory()
if err := dbClient.Connect(); err != nil {
return vterrors.Wrap(err, "can't connect to database")
}
defer dbClient.Close()

var tablet *topodatapb.Tablet
if ct.source.GetExternalMysql() == "" {
log.Infof("trying to find a tablet eligible for vreplication. stream id: %v", ct.id)
tablet, err = ct.tabletPicker.PickForStreaming(ctx)
if err != nil {
return err
}
log.Infof("found a tablet eligible for vreplication. stream id: %v tablet: %s", ct.id, tablet.Alias.String())
ct.sourceTablet.Set(tablet.Alias.String())
}

switch {
case ct.source.Filter != nil:
var vsClient VStreamerClient
if ct.source.GetExternalMysql() == "" {
vsClient = NewTabletVStreamerClient(tablet, ct.mysqld)
} else {
vsClient = NewMySQLVStreamerClient()
}

vd := newVDiffer(ct.id, &ct.source, vsClient, ct.blpStats, dbClient, ct.vre, ct.workflow)
return vd.VDiff(ctx, 60*time.Second)
}
return fmt.Errorf("missing source")
}

func (ct *controller) runBlp(ctx context.Context) (err error) {
defer func() {
ct.sourceTablet.Set("")
Expand Down Expand Up @@ -223,7 +270,7 @@ func (ct *controller) runBlp(ctx context.Context) (err error) {

var vsClient VStreamerClient
if ct.source.GetExternalMysql() == "" {
vsClient = NewTabletVStreamerClient(tablet)
vsClient = NewTabletVStreamerClient(tablet, nil)
} else {
vsClient = NewMySQLVStreamerClient()
}
Expand Down
77 changes: 74 additions & 3 deletions go/vt/vttablet/tabletmanager/vreplication/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package vreplication

import (
"encoding/json"
"errors"
"flag"
"fmt"
Expand All @@ -32,9 +33,11 @@ import (
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/mysqlctl"
"vitess.io/vitess/go/vt/topo"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/topo"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

const (
Expand Down Expand Up @@ -82,12 +85,19 @@ type Engine struct {
cancel context.CancelFunc

ts *topo.Server
tablet *topodatapb.Tablet
cell string
mysqld mysqlctl.MysqlDaemon
dbClientFactory func() binlogplayer.DBClient
dbName string

journaler map[string]*journalEvent

// VDiff Hack
isVDiffRunning bool
vdiffCancel context.CancelFunc
vdiffMu sync.Mutex
vdiffError error
}

type journalEvent struct {
Expand All @@ -98,10 +108,11 @@ type journalEvent struct {

// NewEngine creates a new Engine.
// A nil ts means that the Engine is disabled.
func NewEngine(ts *topo.Server, cell string, mysqld mysqlctl.MysqlDaemon, dbClientFactory func() binlogplayer.DBClient, dbName string) *Engine {
func NewEngine(ts *topo.Server, cell string, tablet *topodatapb.Tablet, mysqld mysqlctl.MysqlDaemon, dbClientFactory func() binlogplayer.DBClient, dbName string) *Engine {
vre := &Engine{
controllers: make(map[int]*controller),
ts: ts,
tablet: tablet,
cell: cell,
mysqld: mysqld,
dbClientFactory: dbClientFactory,
Expand Down Expand Up @@ -217,6 +228,66 @@ func (vre *Engine) IsOpen() bool {
return vre.isOpen
}

// RunVdiff starts a vdiff run on this tablet
func (vre *Engine) RunVdiff() {
vre.vdiffMu.Lock()
defer vre.vdiffMu.Unlock()
if vre.isVDiffRunning {
return
}
vre.isVDiffRunning = true
vre.vdiffError = nil
var ctx context.Context
ctx, vre.vdiffCancel = context.WithCancel(context.Background())
go func() {
if len(vre.controllers) != 1 {
log.Infof("There are no replication streams, nothing to diff")
return
}
// This is using the vreplication id that stars at 1
ctrl := vre.controllers[1]
if ctrl == nil {
log.Infof("VReplication ctrl is nil, this shouldn't happen")
}
diffError := ctrl.runVDiff(ctx)
vre.vdiffMu.Lock()
defer vre.vdiffMu.Unlock()
vre.vdiffError = diffError
vre.isVDiffRunning = false
}()
}

// AbortVdiff aborts current vdiff run
func (vre *Engine) AbortVdiff() {
vre.vdiffMu.Lock()
defer vre.vdiffMu.Unlock()
if !vre.isVDiffRunning {
return
}
vre.isVDiffRunning = false
vre.vdiffCancel()
}

// VDiffReportStatus returns status for current VDiff run
func (vre *Engine) VDiffReportStatus() ([]byte, error) {
vre.vdiffMu.Lock()
defer vre.vdiffMu.Unlock()
resp := make(map[string]interface{})
if vre.isVDiffRunning {
resp["status"] = "running"
} else {
resp["status"] = "not_running"
}
if vre.vdiffError != nil {
resp["last_error"] = vre.vdiffError.Error()
return json.MarshalIndent(resp, "", " ")
}
if VDiffStatus() != nil {
resp["last_diff"] = VDiffStatus()
}
return json.MarshalIndent(resp, "", " ")
}

// Close closes the Engine service.
func (vre *Engine) Close() {
vre.mu.Lock()
Expand Down Expand Up @@ -633,7 +704,7 @@ func (vre *Engine) WaitForPos(ctx context.Context, id int, pos string) error {
}

if current.AtLeast(mPos) {
log.Infof("position: %s reached, wait time: %v", pos, time.Since(start))
log.Infof("position: %s reached, wait time: %v", current, time.Since(start))
return nil
}

Expand Down
16 changes: 8 additions & 8 deletions go/vt/vttablet/tabletmanager/vreplication/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestEngineOpen(t *testing.T) {

// Test Insert

vre := NewEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, dbClient.DBName())
vre := NewEngine(env.TopoServ, env.Cells[0], nil, mysqld, dbClientFactory, dbClient.DBName())
if vre.IsOpen() {
t.Errorf("IsOpen: %v, want false", vre.IsOpen())
}
Expand Down Expand Up @@ -89,7 +89,7 @@ func TestEngineExec(t *testing.T) {

// Test Insert

vre := NewEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, dbClient.DBName())
vre := NewEngine(env.TopoServ, env.Cells[0], nil, mysqld, dbClientFactory, dbClient.DBName())

dbClient.ExpectRequest("select * from _vt.vreplication where db_name='db'", &sqltypes.Result{}, nil)
if err := vre.Open(context.Background()); err != nil {
Expand Down Expand Up @@ -249,7 +249,7 @@ func TestEngineBadInsert(t *testing.T) {
dbClientFactory := func() binlogplayer.DBClient { return dbClient }
mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: 3306}

vre := NewEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, dbClient.DBName())
vre := NewEngine(env.TopoServ, env.Cells[0], nil, mysqld, dbClientFactory, dbClient.DBName())

dbClient.ExpectRequest("select * from _vt.vreplication where db_name='db'", &sqltypes.Result{}, nil)
if err := vre.Open(context.Background()); err != nil {
Expand Down Expand Up @@ -279,7 +279,7 @@ func TestEngineSelect(t *testing.T) {
dbClientFactory := func() binlogplayer.DBClient { return dbClient }
mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: 3306}

vre := NewEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, dbClient.DBName())
vre := NewEngine(env.TopoServ, env.Cells[0], nil, mysqld, dbClientFactory, dbClient.DBName())

dbClient.ExpectRequest("select * from _vt.vreplication where db_name='db'", &sqltypes.Result{}, nil)
if err := vre.Open(context.Background()); err != nil {
Expand Down Expand Up @@ -314,7 +314,7 @@ func TestWaitForPos(t *testing.T) {
dbClient := binlogplayer.NewMockDBClient(t)
mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: 3306}
dbClientFactory := func() binlogplayer.DBClient { return dbClient }
vre := NewEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, dbClient.DBName())
vre := NewEngine(env.TopoServ, env.Cells[0], nil, mysqld, dbClientFactory, dbClient.DBName())

dbClient.ExpectRequest("select * from _vt.vreplication where db_name='db'", &sqltypes.Result{}, nil)
if err := vre.Open(context.Background()); err != nil {
Expand Down Expand Up @@ -344,7 +344,7 @@ func TestWaitForPosError(t *testing.T) {
dbClient := binlogplayer.NewMockDBClient(t)
mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: 3306}
dbClientFactory := func() binlogplayer.DBClient { return dbClient }
vre := NewEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, dbClient.DBName())
vre := NewEngine(env.TopoServ, env.Cells[0], nil, mysqld, dbClientFactory, dbClient.DBName())

err := vre.WaitForPos(context.Background(), 1, "MariaDB/0-1-1084")
want := `vreplication engine is closed`
Expand Down Expand Up @@ -386,7 +386,7 @@ func TestWaitForPosCancel(t *testing.T) {
dbClient := binlogplayer.NewMockDBClient(t)
mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: 3306}
dbClientFactory := func() binlogplayer.DBClient { return dbClient }
vre := NewEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, dbClient.DBName())
vre := NewEngine(env.TopoServ, env.Cells[0], nil, mysqld, dbClientFactory, dbClient.DBName())

dbClient.ExpectRequest("select * from _vt.vreplication where db_name='db'", &sqltypes.Result{}, nil)
if err := vre.Open(context.Background()); err != nil {
Expand Down Expand Up @@ -434,7 +434,7 @@ func TestCreateDBAndTable(t *testing.T) {

// Test Insert

vre := NewEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, dbClient.DBName())
vre := NewEngine(env.TopoServ, env.Cells[0], nil, mysqld, dbClientFactory, dbClient.DBName())

tableNotFound := mysql.SQLError{Num: 1146, Message: "table not found"}
dbClient.ExpectRequest("select * from _vt.vreplication where db_name='db'", nil, &tableNotFound)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func TestMain(m *testing.M) {

InitVStreamerClient(env.Dbcfgs)

playerEngine = NewEngine(env.TopoServ, env.Cells[0], env.Mysqld, realDBClientFactory, vrepldb)
playerEngine = NewEngine(env.TopoServ, env.Cells[0], nil, env.Mysqld, realDBClientFactory, vrepldb)
if err := playerEngine.Open(context.Background()); err != nil {
fmt.Fprintf(os.Stderr, "%v", err)
return 1
Expand Down
Loading