diff --git a/go/cmd/vttablet/vttablet.go b/go/cmd/vttablet/vttablet.go index 2ee6b8f2124..62c2d5f49d4 100644 --- a/go/cmd/vttablet/vttablet.go +++ b/go/cmd/vttablet/vttablet.go @@ -144,6 +144,8 @@ func main() { log.Exitf("NewActionAgent() failed: %v", err) } + tabletserver.InitAPI(agent.VREngine) + servenv.OnClose(func() { // Close the agent so that our topo entry gets pruned properly and any // background goroutines that use the topo connection are stopped. diff --git a/go/mysql/flavor_filepos.go b/go/mysql/flavor_filepos.go index 9b943ba5eab..6bbc4972976 100644 --- a/go/mysql/flavor_filepos.go +++ b/go/mysql/flavor_filepos.go @@ -47,7 +47,6 @@ func (flv *filePosFlavor) masterGTIDSet(c *Conn) (GTIDSet, error) { if len(qr.Rows) == 0 { return nil, errors.New("no master status") } - resultMap, err := resultToMap(qr) if err != nil { return nil, err diff --git a/go/vt/vttablet/tabletmanager/action_agent.go b/go/vt/vttablet/tabletmanager/action_agent.go index 427eb9b0f58..f30083bf6f1 100644 --- a/go/vt/vttablet/tabletmanager/action_agent.go +++ b/go/vt/vttablet/tabletmanager/action_agent.go @@ -322,9 +322,10 @@ func NewActionAgent( vreplication.InitVStreamerClient(agent.DBConfigs) - // The db name is set by the Start function called above filteredWithDBParams, _ := agent.DBConfigs.FilteredWithDB().MysqlParams() - agent.VREngine = vreplication.NewEngine(ts, tabletAlias.Cell, mysqld, func() binlogplayer.DBClient { + // TODO(@setassociative, merge resolution): added filteredWithDBParams; previously set by Start + // agent.VREngine = vreplication.NewEngine(ts, tabletAlias.Cell, mysqld, func() binlogplayer.DBClient { + agent.VREngine = vreplication.NewEngine(ts, tabletAlias.GetCell(), agent.Tablet(), mysqld, func() binlogplayer.DBClient { return binlogplayer.NewDBClient(agent.DBConfigs.FilteredWithDB()) }, filteredWithDBParams.DbName, @@ -418,10 +419,13 @@ func NewTestActionAgent(batchCtx context.Context, ts *topo.Server, tabletAlias * TabletAlias: tabletAlias, Cnf: nil, MysqlDaemon: mysqlDaemon, - VREngine: vreplication.NewEngine(ts, tabletAlias.Cell, mysqlDaemon, binlogplayer.NewFakeDBClient, ti.DbName()), - History: history.New(historyLength), - DemoteMasterType: demoteMasterTabletType, - _healthy: fmt.Errorf("healthcheck not run yet"), + DBConfigs: &dbconfigs.DBConfigs{}, + // TODO(@setassociative, merge resolution): + // vreplication.NewEngine(ts, tabletAlias.Cell, mysqlDaemon, binlogplayer.NewFakeDBClient, ti.DbName()), + VREngine: vreplication.NewEngine(ts, tabletAlias.GetCell(), nil, mysqlDaemon, binlogplayer.NewFakeDBClient, ti.DbName()), + History: history.New(historyLength), + DemoteMasterType: demoteMasterTabletType, + _healthy: fmt.Errorf("healthcheck not run yet"), } if preStart != nil { preStart(agent) @@ -462,11 +466,14 @@ func NewComboActionAgent(batchCtx context.Context, ts *topo.Server, tabletAlias TabletAlias: tabletAlias, Cnf: nil, MysqlDaemon: mysqlDaemon, - VREngine: vreplication.NewEngine(nil, "", nil, nil, ""), - gotMysqlPort: true, - History: history.New(historyLength), - DemoteMasterType: demoteMasterType, - _healthy: fmt.Errorf("healthcheck not run yet"), + DBConfigs: dbcfgs, + // TODO(@setassociative, merge resolution): + // VREngine: vreplication.NewEngine(nil, "", nil, nil, ""), + VREngine: vreplication.NewEngine(nil, "", nil, nil, nil, ""), + gotMysqlPort: true, + History: history.New(historyLength), + DemoteMasterType: demoteMasterType, + _healthy: fmt.Errorf("healthcheck not run yet"), } agent.registerQueryRuleSources() diff --git a/go/vt/vttablet/tabletmanager/init_tablet_test.go b/go/vt/vttablet/tabletmanager/init_tablet_test.go index 409fb6d77c9..adbe5250943 100644 --- a/go/vt/vttablet/tabletmanager/init_tablet_test.go +++ b/go/vt/vttablet/tabletmanager/init_tablet_test.go @@ -182,7 +182,7 @@ func TestInitTablet(t *testing.T) { TabletAlias: tabletAlias, MysqlDaemon: mysqlDaemon, DBConfigs: &dbconfigs.DBConfigs{}, - VREngine: vreplication.NewEngine(nil, "", nil, nil, ""), + VREngine: vreplication.NewEngine(nil, "", nil, nil, nil, ""), batchCtx: ctx, History: history.New(historyLength), _healthy: fmt.Errorf("healthcheck not run yet"), diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller.go b/go/vt/vttablet/tabletmanager/vreplication/controller.go index b215edd2771..e3e24c4ca4e 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller.go @@ -158,6 +158,53 @@ func (ct *controller) run(ctx context.Context) { } } +func (ct *controller) runVDiff(ctx context.Context) (err error) { + defer func() { + ct.sourceTablet.Set("") + if x := recover(); x != nil { + log.Errorf("stream %v: caught panic: %v\n%s", ct.id, x, tb.Stack(4)) + err = fmt.Errorf("panic: %v", x) + } + }() + + select { + case <-ctx.Done(): + return nil + default: + } + + dbClient := ct.dbClientFactory() + if err := dbClient.Connect(); err != nil { + return vterrors.Wrap(err, "can't connect to database") + } + defer dbClient.Close() + + var tablet *topodatapb.Tablet + if ct.source.GetExternalMysql() == "" { + log.Infof("trying to find a tablet eligible for vreplication. stream id: %v", ct.id) + tablet, err = ct.tabletPicker.PickForStreaming(ctx) + if err != nil { + return err + } + log.Infof("found a tablet eligible for vreplication. stream id: %v tablet: %s", ct.id, tablet.Alias.String()) + ct.sourceTablet.Set(tablet.Alias.String()) + } + + switch { + case ct.source.Filter != nil: + var vsClient VStreamerClient + if ct.source.GetExternalMysql() == "" { + vsClient = NewTabletVStreamerClient(tablet, ct.mysqld) + } else { + vsClient = NewMySQLVStreamerClient() + } + + vd := newVDiffer(ct.id, &ct.source, vsClient, ct.blpStats, dbClient, ct.vre, ct.workflow) + return vd.VDiff(ctx, 60*time.Second) + } + return fmt.Errorf("missing source") +} + func (ct *controller) runBlp(ctx context.Context) (err error) { defer func() { ct.sourceTablet.Set("") @@ -223,7 +270,7 @@ func (ct *controller) runBlp(ctx context.Context) (err error) { var vsClient VStreamerClient if ct.source.GetExternalMysql() == "" { - vsClient = NewTabletVStreamerClient(tablet) + vsClient = NewTabletVStreamerClient(tablet, nil) } else { vsClient = NewMySQLVStreamerClient() } diff --git a/go/vt/vttablet/tabletmanager/vreplication/engine.go b/go/vt/vttablet/tabletmanager/vreplication/engine.go index 1c77300455c..99f53882ddf 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/engine.go +++ b/go/vt/vttablet/tabletmanager/vreplication/engine.go @@ -17,6 +17,7 @@ limitations under the License. package vreplication import ( + "encoding/json" "errors" "flag" "fmt" @@ -32,9 +33,11 @@ import ( "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/mysqlctl" + "vitess.io/vitess/go/vt/topo" + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" querypb "vitess.io/vitess/go/vt/proto/query" - "vitess.io/vitess/go/vt/topo" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) const ( @@ -82,12 +85,19 @@ type Engine struct { cancel context.CancelFunc ts *topo.Server + tablet *topodatapb.Tablet cell string mysqld mysqlctl.MysqlDaemon dbClientFactory func() binlogplayer.DBClient dbName string journaler map[string]*journalEvent + + // VDiff Hack + isVDiffRunning bool + vdiffCancel context.CancelFunc + vdiffMu sync.Mutex + vdiffError error } type journalEvent struct { @@ -98,10 +108,11 @@ type journalEvent struct { // NewEngine creates a new Engine. // A nil ts means that the Engine is disabled. -func NewEngine(ts *topo.Server, cell string, mysqld mysqlctl.MysqlDaemon, dbClientFactory func() binlogplayer.DBClient, dbName string) *Engine { +func NewEngine(ts *topo.Server, cell string, tablet *topodatapb.Tablet, mysqld mysqlctl.MysqlDaemon, dbClientFactory func() binlogplayer.DBClient, dbName string) *Engine { vre := &Engine{ controllers: make(map[int]*controller), ts: ts, + tablet: tablet, cell: cell, mysqld: mysqld, dbClientFactory: dbClientFactory, @@ -217,6 +228,66 @@ func (vre *Engine) IsOpen() bool { return vre.isOpen } +// RunVdiff starts a vdiff run on this tablet +func (vre *Engine) RunVdiff() { + vre.vdiffMu.Lock() + defer vre.vdiffMu.Unlock() + if vre.isVDiffRunning { + return + } + vre.isVDiffRunning = true + vre.vdiffError = nil + var ctx context.Context + ctx, vre.vdiffCancel = context.WithCancel(context.Background()) + go func() { + if len(vre.controllers) != 1 { + log.Infof("There are no replication streams, nothing to diff") + return + } + // This is using the vreplication id that stars at 1 + ctrl := vre.controllers[1] + if ctrl == nil { + log.Infof("VReplication ctrl is nil, this shouldn't happen") + } + diffError := ctrl.runVDiff(ctx) + vre.vdiffMu.Lock() + defer vre.vdiffMu.Unlock() + vre.vdiffError = diffError + vre.isVDiffRunning = false + }() +} + +// AbortVdiff aborts current vdiff run +func (vre *Engine) AbortVdiff() { + vre.vdiffMu.Lock() + defer vre.vdiffMu.Unlock() + if !vre.isVDiffRunning { + return + } + vre.isVDiffRunning = false + vre.vdiffCancel() +} + +// VDiffReportStatus returns status for current VDiff run +func (vre *Engine) VDiffReportStatus() ([]byte, error) { + vre.vdiffMu.Lock() + defer vre.vdiffMu.Unlock() + resp := make(map[string]interface{}) + if vre.isVDiffRunning { + resp["status"] = "running" + } else { + resp["status"] = "not_running" + } + if vre.vdiffError != nil { + resp["last_error"] = vre.vdiffError.Error() + return json.MarshalIndent(resp, "", " ") + } + if VDiffStatus() != nil { + resp["last_diff"] = VDiffStatus() + } + return json.MarshalIndent(resp, "", " ") +} + // Close closes the Engine service. func (vre *Engine) Close() { vre.mu.Lock() @@ -633,7 +704,7 @@ func (vre *Engine) WaitForPos(ctx context.Context, id int, pos string) error { } if current.AtLeast(mPos) { - log.Infof("position: %s reached, wait time: %v", pos, time.Since(start)) + log.Infof("position: %s reached, wait time: %v", current, time.Since(start)) return nil } diff --git a/go/vt/vttablet/tabletmanager/vreplication/engine_test.go b/go/vt/vttablet/tabletmanager/vreplication/engine_test.go index 0d1981f8246..ea021d840f9 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/engine_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/engine_test.go @@ -41,7 +41,7 @@ func TestEngineOpen(t *testing.T) { // Test Insert - vre := NewEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, dbClient.DBName()) + vre := NewEngine(env.TopoServ, env.Cells[0], nil, mysqld, dbClientFactory, dbClient.DBName()) if vre.IsOpen() { t.Errorf("IsOpen: %v, want false", vre.IsOpen()) } @@ -89,7 +89,7 @@ func TestEngineExec(t *testing.T) { // Test Insert - vre := NewEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, dbClient.DBName()) + vre := NewEngine(env.TopoServ, env.Cells[0], nil, mysqld, dbClientFactory, dbClient.DBName()) dbClient.ExpectRequest("select * from _vt.vreplication where db_name='db'", &sqltypes.Result{}, nil) if err := vre.Open(context.Background()); err != nil { @@ -249,7 +249,7 @@ func TestEngineBadInsert(t *testing.T) { dbClientFactory := func() binlogplayer.DBClient { return dbClient } mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: 3306} - vre := NewEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, dbClient.DBName()) + vre := NewEngine(env.TopoServ, env.Cells[0], nil, mysqld, dbClientFactory, dbClient.DBName()) dbClient.ExpectRequest("select * from _vt.vreplication where db_name='db'", &sqltypes.Result{}, nil) if err := vre.Open(context.Background()); err != nil { @@ -279,7 +279,7 @@ func TestEngineSelect(t *testing.T) { dbClientFactory := func() binlogplayer.DBClient { return dbClient } mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: 3306} - vre := NewEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, dbClient.DBName()) + vre := NewEngine(env.TopoServ, env.Cells[0], nil, mysqld, dbClientFactory, dbClient.DBName()) dbClient.ExpectRequest("select * from _vt.vreplication where db_name='db'", &sqltypes.Result{}, nil) if err := vre.Open(context.Background()); err != nil { @@ -314,7 +314,7 @@ func TestWaitForPos(t *testing.T) { dbClient := binlogplayer.NewMockDBClient(t) mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: 3306} dbClientFactory := func() binlogplayer.DBClient { return dbClient } - vre := NewEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, dbClient.DBName()) + vre := NewEngine(env.TopoServ, env.Cells[0], nil, mysqld, dbClientFactory, dbClient.DBName()) dbClient.ExpectRequest("select * from _vt.vreplication where db_name='db'", &sqltypes.Result{}, nil) if err := vre.Open(context.Background()); err != nil { @@ -344,7 +344,7 @@ func TestWaitForPosError(t *testing.T) { dbClient := binlogplayer.NewMockDBClient(t) mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: 3306} dbClientFactory := func() binlogplayer.DBClient { return dbClient } - vre := NewEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, dbClient.DBName()) + vre := NewEngine(env.TopoServ, env.Cells[0], nil, mysqld, dbClientFactory, dbClient.DBName()) err := vre.WaitForPos(context.Background(), 1, "MariaDB/0-1-1084") want := `vreplication engine is closed` @@ -386,7 +386,7 @@ func TestWaitForPosCancel(t *testing.T) { dbClient := binlogplayer.NewMockDBClient(t) mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: 3306} dbClientFactory := func() binlogplayer.DBClient { return dbClient } - vre := NewEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, dbClient.DBName()) + vre := NewEngine(env.TopoServ, env.Cells[0], nil, mysqld, dbClientFactory, dbClient.DBName()) dbClient.ExpectRequest("select * from _vt.vreplication where db_name='db'", &sqltypes.Result{}, nil) if err := vre.Open(context.Background()); err != nil { @@ -434,7 +434,7 @@ func TestCreateDBAndTable(t *testing.T) { // Test Insert - vre := NewEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, dbClient.DBName()) + vre := NewEngine(env.TopoServ, env.Cells[0], nil, mysqld, dbClientFactory, dbClient.DBName()) tableNotFound := mysql.SQLError{Num: 1146, Message: "table not found"} dbClient.ExpectRequest("select * from _vt.vreplication where db_name='db'", nil, &tableNotFound) diff --git a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go index 40072569e30..ac66597cda9 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go @@ -103,7 +103,7 @@ func TestMain(m *testing.M) { InitVStreamerClient(env.Dbcfgs) - playerEngine = NewEngine(env.TopoServ, env.Cells[0], env.Mysqld, realDBClientFactory, vrepldb) + playerEngine = NewEngine(env.TopoServ, env.Cells[0], nil, env.Mysqld, realDBClientFactory, vrepldb) if err := playerEngine.Open(context.Background()); err != nil { fmt.Fprintf(os.Stderr, "%v", err) return 1 diff --git a/go/vt/vttablet/tabletmanager/vreplication/vdiff.go b/go/vt/vttablet/tabletmanager/vreplication/vdiff.go new file mode 100644 index 00000000000..69dc3db4c1b --- /dev/null +++ b/go/vt/vttablet/tabletmanager/vreplication/vdiff.go @@ -0,0 +1,817 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vreplication + +import ( + "fmt" + "strings" + "sync" + "time" + + "github.com/golang/protobuf/proto" + "golang.org/x/net/context" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/binlog/binlogplayer" + "vitess.io/vitess/go/vt/key" + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/srvtopo" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/topoproto" + "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vtgate/engine" + "vitess.io/vitess/go/vt/vttablet/tmclient" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + querypb "vitess.io/vitess/go/vt/proto/query" + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" +) + +// vdiff provides the core logic to start vreplication streams +type vdiff struct { + id uint32 + dbClient *vdbClient + // source + source *binlogdatapb.BinlogSource + sourceVStreamer VStreamerClient + targetVStreamer VStreamerClient + + stats *binlogplayer.Stats + vre *Engine + + differs map[string]*tableDiffer + sourceDf *dfParams + targetDf *dfParams + + tmc tmclient.TabletManagerClient + workflow string + + reportMu sync.Mutex + totalSummary DiffReport + diffReports map[string]*DiffReport +} + +// newVDiffer creates a new vreplicator +func newVDiffer(id uint32, source *binlogdatapb.BinlogSource, sourceVStreamer VStreamerClient, stats *binlogplayer.Stats, dbClient binlogplayer.DBClient, vre *Engine, workflow string) *vdiff { + return &vdiff{ + id: id, + source: source, + sourceVStreamer: sourceVStreamer, + stats: stats, + dbClient: newVDBClient(dbClient, stats), + vre: vre, + tmc: tmclient.NewTabletManagerClient(), + workflow: workflow, + diffReports: make(map[string]*DiffReport), + } +} + +type DatabaseReport struct { + GlobalSummary DiffReport `json:"total_summary,omitempty"` + TablesReport map[string]*DiffReport `json:"table_diffs,omitempty"` +} + +// DiffReport is the summary of differences for one table. +type DiffReport struct { + ProcessedRows int + MatchingRows int + MismatchedRows int + ExtraRowsSource int + ExtraRowsTarget int +} + +type tableDiffer struct { + targetTable string + sourceExpression string + targetExpression string + compareCols []int + comparePKs []int + comparePKNames []string + sourcePrimitive engine.Primitive + targetPrimitive engine.Primitive +} + +type dfParams struct { + master *topo.TabletInfo + vstreamer VStreamerClient + position mysql.Position + snapshotPosition string + result chan *sqltypes.Result + err error +} + +var currentDatabaseReport *DatabaseReport + +// Replicate starts a vreplication stream. +func (df *vdiff) VDiff(ctx context.Context, filteredReplicationWaitTime time.Duration) error { + df.targetVStreamer = NewTabletVStreamerClient(df.vre.tablet, df.vre.mysqld) + + df.sourceVStreamer.Open(ctx) + df.targetVStreamer.Open(ctx) + defer func() { + df.sourceVStreamer.Close(context.Background()) + df.targetVStreamer.Close(context.Background()) + }() + + tablet, err := df.vre.ts.GetTablet(ctx, df.vre.tablet.GetAlias()) + if err != nil { + return err + } + + targetShard, err := df.vre.ts.GetShard(ctx, tablet.GetKeyspace(), tablet.GetShard()) + if err != nil { + return err + } + + targetMaster, err := df.vre.ts.GetTablet(ctx, targetShard.MasterAlias) + if err != nil { + return err + } + + df.sourceDf = &dfParams{ + vstreamer: df.sourceVStreamer, + } + + df.targetDf = &dfParams{ + master: targetMaster, + vstreamer: df.targetVStreamer, + } + + schm, err := df.getSchema(ctx, df.vre.tablet.GetAlias(), nil, nil, false) + + if err != nil { + return vterrors.Wrap(err, "GetSchema") + } + + df.differs, err = buildVDiffPlan(ctx, df.source.Filter, schm) + if err != nil { + return err + } + + defer func() { + if err := df.restartTarget(context.Background()); err != nil { + log.Error("Could not restart workflow %s: %v, please restart it manually", err) + } + }() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + for table, td := range df.differs { + if err := df.stopTarget(ctx); err != nil { + return vterrors.Wrap(err, "stopTargets") + } + sourceReader, err := df.startQueryStreams(ctx, df.sourceDf, td.sourceExpression, filteredReplicationWaitTime) + if err != nil { + return vterrors.Wrap(err, "startQueryStreams(sources)") + } + if err := df.syncTargets(ctx, filteredReplicationWaitTime); err != nil { + return vterrors.Wrap(err, "syncTargets") + } + targetReader, err := df.startQueryStreams(ctx, df.targetDf, td.targetExpression, filteredReplicationWaitTime) + if err != nil { + return vterrors.Wrap(err, "startQueryStreams(targets)") + } + if err := df.restartTarget(ctx); err != nil { + return vterrors.Wrap(err, "restartTarget") + } + dr, err := td.diff(ctx, sourceReader, targetReader) + if err != nil { + return vterrors.Wrap(err, "diff") + } + log.Infof("Summary for %v: %+v\n", td.targetTable, *dr) + func() { + df.reportMu.Lock() + defer df.reportMu.Unlock() + df.totalSummary.MatchingRows += dr.MatchingRows + df.totalSummary.ProcessedRows += dr.ProcessedRows + df.totalSummary.MismatchedRows += dr.MismatchedRows + df.totalSummary.ExtraRowsSource += dr.ExtraRowsSource + df.totalSummary.ExtraRowsTarget += dr.ExtraRowsTarget + df.diffReports[table] = dr + currentDatabaseReport = &DatabaseReport{ + GlobalSummary: df.totalSummary, + TablesReport: df.diffReports, + } + }() + } + log.Infof("Total Diffs: processed: %v, matched: %v, extra_rows_source: %v, extra_rows_target: %v, mistmatched_rows: %v", df.totalSummary.ProcessedRows, df.totalSummary.MatchingRows, df.totalSummary.ExtraRowsSource, df.totalSummary.ExtraRowsTarget, df.totalSummary.MismatchedRows) + return nil +} + +func VDiffStatus() *DatabaseReport { + return currentDatabaseReport +} + +func buildVDiffPlan(ctx context.Context, filter *binlogdatapb.Filter, schm *tabletmanagerdatapb.SchemaDefinition) (map[string]*tableDiffer, error) { + differs := make(map[string]*tableDiffer) + for _, table := range schm.TableDefinitions { + rule, err := MatchTable(table.Name, filter) + if err != nil { + return nil, err + } + if rule == nil { + continue + } + query := rule.Filter + if rule.Filter == "" || key.IsKeyRange(rule.Filter) { + buf := sqlparser.NewTrackedBuffer(nil) + buf.Myprintf("select * from %v", sqlparser.NewTableIdent(table.Name)) + query = buf.String() + } + differs[table.Name], err = buildDifferPlan(table, query) + if err != nil { + return nil, err + } + } + return differs, nil +} + +func buildDifferPlan(table *tabletmanagerdatapb.TableDefinition, query string) (*tableDiffer, error) { + statement, err := sqlparser.Parse(query) + if err != nil { + return nil, err + } + sel, ok := statement.(*sqlparser.Select) + if !ok { + return nil, fmt.Errorf("unexpected: %v", sqlparser.String(statement)) + } + td := &tableDiffer{ + targetTable: table.Name, + } + sourceSelect := &sqlparser.Select{} + targetSelect := &sqlparser.Select{} + var aggregates []engine.AggregateParams + for _, selExpr := range sel.SelectExprs { + switch selExpr := selExpr.(type) { + case *sqlparser.StarExpr: + for _, fld := range table.Fields { + aliased := &sqlparser.AliasedExpr{Expr: &sqlparser.ColName{Name: sqlparser.NewColIdent(fld.Name)}} + sourceSelect.SelectExprs = append(sourceSelect.SelectExprs, aliased) + targetSelect.SelectExprs = append(targetSelect.SelectExprs, aliased) + } + case *sqlparser.AliasedExpr: + var targetCol *sqlparser.ColName + if !selExpr.As.IsEmpty() { + targetCol = &sqlparser.ColName{Name: selExpr.As} + } else { + if colAs, ok := selExpr.Expr.(*sqlparser.ColName); ok { + targetCol = colAs + } else { + return nil, fmt.Errorf("expression needs an alias: %v", sqlparser.String(selExpr)) + } + } + sourceSelect.SelectExprs = append(sourceSelect.SelectExprs, selExpr) + targetSelect.SelectExprs = append(targetSelect.SelectExprs, &sqlparser.AliasedExpr{Expr: targetCol}) + + // Check if it's an aggregate expression + if expr, ok := selExpr.Expr.(*sqlparser.FuncExpr); ok { + switch fname := expr.Name.Lowered(); fname { + case "count", "sum": + aggregates = append(aggregates, engine.AggregateParams{ + Opcode: engine.SupportedAggregates[fname], + Col: len(sourceSelect.SelectExprs) - 1, + }) + } + } + default: + return nil, fmt.Errorf("unexpected: %v", sqlparser.String(statement)) + } + } + fields := make(map[string]querypb.Type) + for _, field := range table.Fields { + fields[strings.ToLower(field.Name)] = field.Type + } + + td.compareCols = make([]int, len(sourceSelect.SelectExprs)) + for i := range td.compareCols { + colname := targetSelect.SelectExprs[i].(*sqlparser.AliasedExpr).Expr.(*sqlparser.ColName).Name.Lowered() + typ, ok := fields[colname] + if !ok { + return nil, fmt.Errorf("column %v not found in table %v", colname, table.Name) + } + td.compareCols[i] = i + if sqltypes.IsText(typ) { + sourceSelect.SelectExprs = append(sourceSelect.SelectExprs, wrapWeightString(sourceSelect.SelectExprs[i])) + targetSelect.SelectExprs = append(targetSelect.SelectExprs, wrapWeightString(targetSelect.SelectExprs[i])) + td.compareCols[i] = len(sourceSelect.SelectExprs) - 1 + } + } + + sourceSelect.From = sel.From + targetSelect.From = sqlparser.TableExprs{ + &sqlparser.AliasedTableExpr{ + Expr: &sqlparser.TableName{ + Name: sqlparser.NewTableIdent(table.Name), + }, + }, + } + + var orderby sqlparser.OrderBy + for _, pk := range table.PrimaryKeyColumns { + found := false + for i, selExpr := range targetSelect.SelectExprs { + colname := selExpr.(*sqlparser.AliasedExpr).Expr.(*sqlparser.ColName).Name.Lowered() + if pk == colname { + td.comparePKs = append(td.comparePKs, td.compareCols[i]) + td.comparePKNames = append(td.comparePKNames, colname) + // We'll be comparing pks seperately. So, remove them from compareCols. + td.compareCols[i] = -1 + found = true + break + } + } + if !found { + // Unreachable. + return nil, fmt.Errorf("column %v not found in table %v", pk, table.Name) + } + orderby = append(orderby, &sqlparser.Order{ + Expr: &sqlparser.ColName{Name: sqlparser.NewColIdent(pk)}, + Direction: sqlparser.AscScr, + }) + } + sourceSelect.Where = removeKeyrange(sel.Where) + sourceSelect.GroupBy = sel.GroupBy + sourceSelect.OrderBy = orderby + + targetSelect.OrderBy = orderby + + td.sourceExpression = sqlparser.String(sourceSelect) + td.targetExpression = sqlparser.String(targetSelect) + + td.sourcePrimitive = newMergeSorter(td.comparePKs) + td.targetPrimitive = newMergeSorter(td.comparePKs) + if len(aggregates) != 0 { + td.sourcePrimitive = &engine.OrderedAggregate{ + Aggregates: aggregates, + Keys: td.comparePKs, + Input: td.sourcePrimitive, + } + } + + return td, nil +} + +func (df *vdiff) startQueryStreams(ctx context.Context, participant *dfParams, query string, filteredReplicationWaitTime time.Duration) (*resultReader, error) { + waitCtx, cancel := context.WithTimeout(ctx, filteredReplicationWaitTime) + defer cancel() + // Iteration for each participant. + + if err := participant.vstreamer.WaitForPosition(waitCtx, mysql.EncodePosition(participant.position)); err != nil { + return nil, vterrors.Wrapf(err, "WaitForPosition for tablet %v", participant.vstreamer) + } + participant.result = make(chan *sqltypes.Result, 1) + gtidch := make(chan string, 1) + + // Start the stream in a separate goroutine. + go df.streamOne(ctx, participant, query, gtidch) + + // Wait for the gtid to be sent. If it's not received, there was an error + // which would be stored in participant.err. + gtid, ok := <-gtidch + if !ok { + return nil, participant.err + } + // Save the new position, as of when the query executed. + participant.snapshotPosition = gtid + return newResultReader(ctx, participant), nil +} + +// streamOne is called as a goroutine, and communicates its results through channels. +// It first sends the snapshot gtid to gtidch. +// Then it streams results to participant.result. +// Before returning, it sets participant.err, and closes all channels. +// If any channel is closed, then participant.err can be checked if there was an error. +func (df *vdiff) streamOne(ctx context.Context, participant *dfParams, query string, gtidch chan string) { + defer close(participant.result) + defer close(gtidch) + + // Wrap the streaming in a separate function so we can capture the error. + // This shows that the error will be set before the channels are closed. + participant.err = func() error { + var fields []*querypb.Field + err := participant.vstreamer.VStreamResults(ctx, query, func(vrs *binlogdatapb.VStreamResultsResponse) error { + if vrs.Fields != nil { + fields = vrs.Fields + gtidch <- vrs.Gtid + } + p3qr := &querypb.QueryResult{ + Fields: fields, + Rows: vrs.Rows, + } + result := sqltypes.Proto3ToResult(p3qr) + // Fields should be received only once, and sent only once. + if vrs.Fields == nil { + result.Fields = nil + } + select { + case participant.result <- result: + case <-ctx.Done(): + return vterrors.Wrap(ctx.Err(), "VStreamResults") + } + return nil + }) + return err + }() +} + +func (df *vdiff) syncTargets(ctx context.Context, filteredReplicationWaitTime time.Duration) error { + waitCtx, cancel := context.WithTimeout(ctx, filteredReplicationWaitTime) + defer cancel() + pos := df.sourceDf.snapshotPosition + query := fmt.Sprintf("update _vt.vreplication set state='Running', stop_pos='%s', message='synchronizing for vdiff' where id=%d", pos, df.id) + if _, err := df.tmc.VReplicationExec(ctx, df.targetDf.master.Tablet, query); err != nil { + return err + } + + if err := df.tmc.VReplicationWaitForPos(waitCtx, df.targetDf.master.Tablet, int(df.id), pos); err != nil { + return vterrors.Wrapf(err, "VReplicationWaitForPos for tablet %v", topoproto.TabletAliasString(df.targetDf.master.Tablet.Alias)) + } + log.Infof("VReplication successfully stopped at position: %v", pos) + + pos, err := df.tmc.MasterPosition(ctx, df.targetDf.master.Tablet) + if err != nil { + return err + } + mpos, err := mysql.DecodePosition(pos) + if err != nil { + return err + } + df.targetDf.position = mpos + return nil +} + +func (df *vdiff) restartTarget(ctx context.Context) error { + query := fmt.Sprintf("update _vt.vreplication set state='Running', message='', stop_pos='' where db_name=%s and workflow=%s", encodeString(df.targetDf.master.DbName()), encodeString(df.workflow)) + _, err := df.tmc.VReplicationExec(ctx, df.targetDf.master.Tablet, query) + return err +} + +func (df *vdiff) getSchema(ctx context.Context, tabletAlias *topodatapb.TabletAlias, tables, excludeTables []string, includeViews bool) (*tabletmanagerdatapb.SchemaDefinition, error) { + ti, err := df.vre.ts.GetTablet(ctx, tabletAlias) + if err != nil { + return nil, fmt.Errorf("GetTablet(%v) failed: %v", tabletAlias, err) + } + + return df.tmc.GetSchema(ctx, ti.Tablet, tables, excludeTables, includeViews) +} + +//----------------------------------------------------------------- +// primitiveExecutor + +type primitiveExecutor struct { + prim engine.Primitive + rows [][]sqltypes.Value + resultch chan *sqltypes.Result + err error +} + +func newPrimitiveExecutor(ctx context.Context, vcursor engine.VCursor, prim engine.Primitive) *primitiveExecutor { + pe := &primitiveExecutor{ + prim: prim, + resultch: make(chan *sqltypes.Result, 1), + } + go func() { + defer close(pe.resultch) + pe.err = pe.prim.StreamExecute(vcursor, make(map[string]*querypb.BindVariable), false, func(qr *sqltypes.Result) error { + select { + case pe.resultch <- qr: + case <-ctx.Done(): + return vterrors.Wrap(ctx.Err(), "Outer Stream") + } + return nil + }) + }() + return pe +} + +func (pe *primitiveExecutor) next() ([]sqltypes.Value, error) { + for len(pe.rows) == 0 { + qr, ok := <-pe.resultch + if !ok { + return nil, pe.err + } + pe.rows = qr.Rows + } + + row := pe.rows[0] + pe.rows = pe.rows[1:] + return row, nil +} + +func (pe *primitiveExecutor) drain(ctx context.Context) (int, error) { + count := 0 + for { + row, err := pe.next() + if err != nil { + return 0, err + } + if row == nil { + return count, nil + } + count++ + } +} + +//----------------------------------------------------------------- +// mergeSorter + +var _ engine.Primitive = (*mergeSorter)(nil) + +// mergeSorter performs a merge-sorted read from the participants. +type mergeSorter struct { + engine.Primitive + orderBy []engine.OrderbyParams +} + +func newMergeSorter(comparePKs []int) *mergeSorter { + ob := make([]engine.OrderbyParams, 0, len(comparePKs)) + for _, col := range comparePKs { + ob = append(ob, engine.OrderbyParams{Col: col}) + } + return &mergeSorter{ + orderBy: ob, + } +} + +func (ms *mergeSorter) StreamExecute(vcursor engine.VCursor, bindVars map[string]*querypb.BindVariable, wantields bool, callback func(*sqltypes.Result) error) error { + // TODO: I don't really need to do a merge sort here, is a single stream, but I'm lazy and don't want to think. + _, ok := vcursor.(*resultReader) + if !ok { + return fmt.Errorf("internal error: vcursor is not a resultReader: %T", vcursor) + } + rss := make([]*srvtopo.ResolvedShard, 0, 1) + bvs := make([]map[string]*querypb.BindVariable, 0, 1) + rss = append(rss, &srvtopo.ResolvedShard{ + Target: &querypb.Target{ + Shard: "-", + }, + }) + bvs = append(bvs, bindVars) + return engine.MergeSort(vcursor, "", ms.orderBy, rss, bvs, callback) +} + +//----------------------------------------------------------------- +// resultReader + +// resultReader acts as a VCursor for the wrapping primitives. +type resultReader struct { + engine.VCursor + ctx context.Context + participant *dfParams +} + +func newResultReader(ctx context.Context, participant *dfParams) *resultReader { + return &resultReader{ + ctx: ctx, + participant: participant, + } +} + +func (rr *resultReader) Context() context.Context { + return rr.ctx +} + +func (rr *resultReader) StreamExecuteMulti(query string, rss []*srvtopo.ResolvedShard, bindVars []map[string]*querypb.BindVariable, callback func(reply *sqltypes.Result) error) error { + for result := range rr.participant.result { + if err := callback(result); err != nil { + return err + } + } + return nil +} + +//----------------------------------------------------------------- +// tableDiffer + +func (td *tableDiffer) diff(ctx context.Context, sourceReader, targetReader *resultReader) (*DiffReport, error) { + sourceExecutor := newPrimitiveExecutor(ctx, sourceReader, td.sourcePrimitive) + targetExecutor := newPrimitiveExecutor(ctx, targetReader, td.targetPrimitive) + dr := &DiffReport{} + var sourceRow, targetRow []sqltypes.Value + var err error + advanceSource := true + advanceTarget := true + for { + if advanceSource { + sourceRow, err = sourceExecutor.next() + if err != nil { + return nil, err + } + } + if advanceTarget { + targetRow, err = targetExecutor.next() + if err != nil { + return nil, err + } + } + + if sourceRow == nil && targetRow == nil { + return dr, nil + } + + advanceSource = true + advanceTarget = true + + if sourceRow == nil { + // drain target, update count + pk := td.pkString(targetRow) + log.Errorf("Draining extra row(s) found on the target. This is the extra row pk: %v", pk) + count, err := targetExecutor.drain(ctx) + if err != nil { + return nil, err + } + dr.ExtraRowsTarget += 1 + count + dr.ProcessedRows += 1 + count + return dr, nil + } + if targetRow == nil { + // no more rows from the target + // we know we have rows from source, drain, update count + pk := td.pkString(sourceRow) + log.Errorf("Draining extra row(s) found on the source. This is the extra row pk: %v", pk) + count, err := sourceExecutor.drain(ctx) + if err != nil { + return nil, err + } + dr.ExtraRowsSource += 1 + count + dr.ProcessedRows += 1 + count + return dr, nil + } + + dr.ProcessedRows++ + + // Compare pk values. + c, _, err := td.compare(sourceRow, targetRow, td.comparePKs) + switch { + case err != nil: + return nil, err + case c < 0: + if dr.ExtraRowsSource < 10 { + log.Errorf("[table=%v] Extra row %v on source: %v", td.targetTable, dr.ExtraRowsSource, td.pkString(sourceRow)) + } + dr.ExtraRowsSource++ + advanceTarget = false + continue + case c > 0: + if dr.ExtraRowsTarget < 10 { + log.Errorf("[table=%v] Extra row %v on target: %v", td.targetTable, dr.ExtraRowsTarget, td.pkString(targetRow)) + } + dr.ExtraRowsTarget++ + advanceSource = false + continue + } + + // c == 0 + // Compare non-pk values. + c, _, err = td.compare(sourceRow, targetRow, td.compareCols) + switch { + case err != nil: + return nil, err + case c != 0: + if dr.MismatchedRows < 10 { + log.Errorf("[table=%v] Different content for PK: %v", td.targetTable, td.pkString(sourceRow)) + } + dr.MismatchedRows++ + default: + dr.MatchingRows++ + } + } +} + +func (td *tableDiffer) pkString(targetRow []sqltypes.Value) string { + pk := "" + for index, col := range td.comparePKs { + if col == -1 { + continue + } + pk += fmt.Sprintf("%v:%v", td.comparePKNames[index], targetRow[col].String()) + if index < len(td.comparePKs)-1 { + pk += ", " + } + } + return pk +} + +func (td *tableDiffer) compare(sourceRow, targetRow []sqltypes.Value, cols []int) (int, int, error) { + for _, col := range cols { + if col == -1 { + continue + } + c, err := sqltypes.NullsafeCompare(sourceRow[col], targetRow[col]) + if err != nil { + return 0, 0, err + } + if c != 0 { + return c, col, nil + } + } + return 0, 0, nil +} + +func removeKeyrange(where *sqlparser.Where) *sqlparser.Where { + if where == nil { + return nil + } + if isFuncKeyrange(where.Expr) { + return nil + } + where.Expr = removeExprKeyrange(where.Expr) + return where +} + +func removeExprKeyrange(node sqlparser.Expr) sqlparser.Expr { + switch node := node.(type) { + case *sqlparser.AndExpr: + if isFuncKeyrange(node.Left) { + return removeExprKeyrange(node.Right) + } + if isFuncKeyrange(node.Right) { + return removeExprKeyrange(node.Left) + } + return &sqlparser.AndExpr{ + Left: removeExprKeyrange(node.Left), + Right: removeExprKeyrange(node.Right), + } + case *sqlparser.ParenExpr: + return &sqlparser.ParenExpr{ + Expr: removeExprKeyrange(node.Expr), + } + } + return node +} + +func isFuncKeyrange(expr sqlparser.Expr) bool { + funcExpr, ok := expr.(*sqlparser.FuncExpr) + return ok && funcExpr.Name.EqualString("in_keyrange") +} + +func wrapWeightString(expr sqlparser.SelectExpr) *sqlparser.AliasedExpr { + return &sqlparser.AliasedExpr{ + Expr: &sqlparser.FuncExpr{ + Name: sqlparser.NewColIdent("weight_string"), + Exprs: []sqlparser.SelectExpr{ + &sqlparser.AliasedExpr{ + Expr: expr.(*sqlparser.AliasedExpr).Expr, + }, + }, + }, + } +} + +func (df *vdiff) stopTarget(ctx context.Context) error { + var mu sync.Mutex + + query := fmt.Sprintf("update _vt.vreplication set state='Stopped', message='for vdiff' where db_name=%s and workflow=%s", encodeString(df.targetDf.master.DbName()), encodeString(df.workflow)) + _, err := df.tmc.VReplicationExec(ctx, df.targetDf.master.Tablet, query) + if err != nil { + return err + } + query = fmt.Sprintf("select source, pos from _vt.vreplication where db_name=%s and workflow=%s", encodeString(df.targetDf.master.DbName()), encodeString(df.workflow)) + p3qr, err := df.tmc.VReplicationExec(ctx, df.targetDf.master.Tablet, query) + if err != nil { + return err + } + qr := sqltypes.Proto3ToResult(p3qr) + + for _, row := range qr.Rows { + var bls binlogdatapb.BinlogSource + if err := proto.UnmarshalText(row[0].ToString(), &bls); err != nil { + return err + } + pos, err := mysql.DecodePosition(row[1].ToString()) + if err != nil { + return err + } + func() { + mu.Lock() + defer mu.Unlock() + + // if bls.Shard != df.shard { + // // Unreachable. + // return + // } + if !df.sourceDf.position.IsZero() && df.sourceDf.position.AtLeast(pos) { + return + } + df.sourceDf.position = pos + }() + } + return nil +} diff --git a/go/vt/vttablet/tabletmanager/vreplication/vstreamer_client.go b/go/vt/vttablet/tabletmanager/vreplication/vstreamer_client.go index 309d901ef55..ee0a86a2198 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vstreamer_client.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vstreamer_client.go @@ -23,9 +23,12 @@ import ( "golang.org/x/net/context" + "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/dbconfigs" "vitess.io/vitess/go/vt/grpcclient" + "vitess.io/vitess/go/vt/mysqlctl" + "vitess.io/vitess/go/vt/vtgate/vindexes" "vitess.io/vitess/go/vt/vttablet/queryservice" "vitess.io/vitess/go/vt/vttablet/tabletconn" "vitess.io/vitess/go/vt/vttablet/tabletserver/schema" @@ -55,6 +58,12 @@ type VStreamerClient interface { // VStreamRows streams rows of a table from the specified starting point. VStreamRows(ctx context.Context, query string, lastpk *querypb.QueryResult, send func(*binlogdatapb.VStreamRowsResponse) error) error + + // VStreamResults streams results along with the gtid of the snapshot. + VStreamResults(ctx context.Context, query string, send func(*binlogdatapb.VStreamResultsResponse) error) error + + // WaitForPosition ... + WaitForPosition(ctx context.Context, pos string) error } // TabletVStreamerClient a vstream client backed by vttablet @@ -65,6 +74,7 @@ type TabletVStreamerClient struct { isOpen bool tablet *topodatapb.Tablet + mysqld mysqlctl.MysqlDaemon target *querypb.Target tsQueryService queryservice.QueryService } @@ -77,13 +87,16 @@ type MySQLVStreamerClient struct { isOpen bool sourceConnParams dbconfigs.Connector - sourceSe *schema.Engine + // TODO(@setassociative, merge resolution) + // sourceCp *mysql.ConnParams + sourceSe *schema.Engine } // NewTabletVStreamerClient creates a new TabletVStreamerClient -func NewTabletVStreamerClient(tablet *topodatapb.Tablet) *TabletVStreamerClient { +func NewTabletVStreamerClient(tablet *topodatapb.Tablet, mysqld mysqlctl.MysqlDaemon) *TabletVStreamerClient { return &TabletVStreamerClient{ tablet: tablet, + mysqld: mysqld, target: &querypb.Target{ Keyspace: tablet.Keyspace, Shard: tablet.Shard, @@ -132,6 +145,24 @@ func (vsClient *TabletVStreamerClient) VStreamRows(ctx context.Context, query st return vsClient.tsQueryService.VStreamRows(ctx, vsClient.target, query, lastpk, send) } +// VStreamResults part of the VStreamerClient interface +func (vsClient *TabletVStreamerClient) VStreamResults(ctx context.Context, query string, send func(*binlogdatapb.VStreamResultsResponse) error) error { + if !vsClient.isOpen { + return errors.New("can't VStreamRows without opening client") + } + vsClient.target.TabletType = topodatapb.TabletType_MASTER + return vsClient.tsQueryService.VStreamResults(ctx, vsClient.target, query, send) +} + +// WaitForPosition ... +func (vsClient *TabletVStreamerClient) WaitForPosition(ctx context.Context, pos string) error { + targetPos, err := mysql.DecodePosition(pos) + if err != nil { + return err + } + return vsClient.mysqld.WaitMasterPos(ctx, targetPos) +} + // NewMySQLVStreamerClient is a vstream client that allows you to stream directly from MySQL. // In order to achieve this, the following creates a vstreamer Engine with a dummy in memorytopo. func NewMySQLVStreamerClient() *MySQLVStreamerClient { @@ -141,6 +172,8 @@ func NewMySQLVStreamerClient() *MySQLVStreamerClient { // TODO: For now external mysql streams can only be used with ExternalReplWithDB creds. // In the future we will support multiple users. vsClient := &MySQLVStreamerClient{ + // TODO(@setassociative, merge resolution) + // sourceCp: dbcfgs.ExternalReplWithDB(), sourceConnParams: dbcfgs.ExternalReplWithDB(), } return vsClient @@ -159,7 +192,11 @@ func (vsClient *MySQLVStreamerClient) Open(ctx context.Context) (err error) { config := tabletenv.NewDefaultConfig() vsClient.sourceSe = schema.NewEngine(tabletenv.NewTestEnv(config, nil, "VStreamerClientTest")) + // TODO(@setassociative, merge resolution): + // vsClient.sourceSe = schema.NewEngine(checker{}, tabletenv.DefaultQsConfig) + // vsClient.sourceSe.InitDBConfig(vsClient.sourceCp) vsClient.sourceSe.InitDBConfig(vsClient.sourceConnParams) + err = vsClient.sourceSe.Open() if err != nil { return err @@ -185,7 +222,9 @@ func (vsClient *MySQLVStreamerClient) VStream(ctx context.Context, startPos stri if !vsClient.isOpen { return errors.New("can't VStream without opening client") } - streamer := vstreamer.NewVStreamer(ctx, vsClient.sourceConnParams, vsClient.sourceSe, startPos, filter, send) + // TODO(@setassociative, merge resolution) + // streamer := vstreamer.NewVStreamer(ctx, vsClient.sourceCp, vsClient.sourceSe, startPos, filter, send) + streamer := vstreamer.NewVStreamer(ctx, vsClient.sourceConnParams, vsClient.sourceSe, startPos, filter, &vindexes.KeyspaceSchema{}, send) return streamer.Stream() } @@ -202,10 +241,125 @@ func (vsClient *MySQLVStreamerClient) VStreamRows(ctx context.Context, query str } row = r.Rows[0] } - streamer := vstreamer.NewRowStreamer(ctx, vsClient.sourceConnParams, vsClient.sourceSe, query, row, send) + + // TODO(@setassociative, merge resolution) + // streamer := vstreamer.NewRowStreamer(ctx, vsClient.sourceCp, vsClient.sourceSe, query, row, send) + streamer := vstreamer.NewRowStreamer(ctx, vsClient.sourceConnParams, vsClient.sourceSe, query, row, &vindexes.KeyspaceSchema{}, send) + return streamer.Stream() +} + +// VStreamResults part of the VStreamerClient interface +func (vsClient *MySQLVStreamerClient) VStreamResults(ctx context.Context, query string, send func(*binlogdatapb.VStreamResultsResponse) error) error { + if !vsClient.isOpen { + return errors.New("can't VStreamRows without opening client") + } + + // TODO(@setassociative, merge resolution) + // streamer := vstreamer.NewResultStreamer(ctx, vsClient.sourceCp, query, send) + streamer := vstreamer.NewResultStreamer(ctx, vsClient.sourceConnParams, query, send) return streamer.Stream() } +// WaitForPosition returns the master position +func (vsClient *MySQLVStreamerClient) WaitForPosition(ctx context.Context, pos string) error { + targetPos, err := mysql.DecodePosition(pos) + if err != nil { + return err + } + + // Get a connection. + // TODO(@setassociative, merge resolution) + // params, err := dbconfigs.WithCredentials(vsClient.sourceCp) + params, err := dbconfigs.WithCredentials(vsClient.sourceConnParams) + if err != nil { + return err + } + conn, err := mysql.Connect(ctx, params) + if err != nil { + return fmt.Errorf("error in connecting to mysql db, err %v", err) + } + + defer conn.Close() + + // If we are the master, WaitUntilPositionCommand will fail. + // But position is most likely reached. So, check the position + // first. + mpos, err := conn.MasterPosition() + if err != nil { + return fmt.Errorf("WaitMasterPos: MasterPosition failed: %v", err) + } + if mpos.AtLeast(targetPos) { + return nil + } + + // Find the query to run, run it. + query, err := conn.WaitUntilPositionCommand(ctx, targetPos) + if err != nil { + return err + } + qr, err := executeFetchContext(ctx, conn, query, 1, true) + if err != nil { + return fmt.Errorf("WaitUntilPositionCommand(%v) failed: %v", query, err) + } + if len(qr.Rows) != 1 || len(qr.Rows[0]) != 1 { + return fmt.Errorf("unexpected result format from WaitUntilPositionCommand(%v): %#v", query, qr) + } + result := qr.Rows[0][0] + if result.IsNull() { + return fmt.Errorf("WaitUntilPositionCommand(%v) failed: replication is probably stopped", query) + } + if result.ToString() == "-1" { + return fmt.Errorf("timed out waiting for position %v", targetPos) + } + return nil +} + +func executeFetchContext(ctx context.Context, conn *mysql.Conn, query string, maxrows int, wantfields bool) (*sqltypes.Result, error) { + // Fast fail if context is done. + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + + // Execute asynchronously so we can select on both it and the context. + var qr *sqltypes.Result + var executeErr error + done := make(chan struct{}) + go func() { + defer close(done) + + qr, executeErr = conn.ExecuteFetch(query, maxrows, wantfields) + }() + + // Wait for either the query or the context to be done. + select { + case <-done: + return qr, executeErr + case <-ctx.Done(): + // If both are done already, we may end up here anyway because select + // chooses among multiple ready channels pseudorandomly. + // Check the done channel and prefer that one if it's ready. + select { + case <-done: + return qr, executeErr + default: + } + + // Wait for the conn.ExecuteFetch() call to return. + <-done + // Close the connection. Upon Recycle() it will be thrown out. + conn.Close() + // ExecuteFetch() may have succeeded before we tried to kill it. + // If ExecuteFetch() had returned because we cancelled it, + // then executeErr would be an error like "MySQL has gone away". + if executeErr == nil { + return qr, executeErr + } + return nil, ctx.Err() + } +} + // InitVStreamerClient initializes config for vstreamer client func InitVStreamerClient(cfg *dbconfigs.DBConfigs) { dbcfgs = cfg diff --git a/go/vt/vttablet/tabletmanager/vreplication/vstreamer_client_test.go b/go/vt/vttablet/tabletmanager/vreplication/vstreamer_client_test.go index af3e762383b..deecfc9128b 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vstreamer_client_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vstreamer_client_test.go @@ -275,6 +275,8 @@ func TestNewMySQLVStreamerClient(t *testing.T) { name: "sets conn params for MySQLVStreamerClient ", want: &MySQLVStreamerClient{ sourceConnParams: dbcfgs.ExternalReplWithDB(), + // TODO(@setassociatiev, merge resolution) + // sourceCp: env.Dbcfgs.ExternalReplWithDB(), }, }, } @@ -294,6 +296,8 @@ func TestMySQLVStreamerClientOpen(t *testing.T) { }) type fields struct { sourceConnParams dbconfigs.Connector + // TODO(@setassociative, merge resolution) + // sourceConnParams *mysql.ConnParams } type args struct { ctx context.Context @@ -327,7 +331,7 @@ func TestMySQLVStreamerClientOpen(t *testing.T) { for _, tcase := range tests { t.Run(tcase.name, func(t *testing.T) { vsClient := &MySQLVStreamerClient{ - sourceConnParams: tcase.fields.sourceConnParams, + sourceCp: tcase.fields.sourceConnParams, } err := vsClient.Open(tcase.args.ctx) @@ -358,6 +362,8 @@ func TestMySQLVStreamerClientClose(t *testing.T) { type fields struct { isOpen bool sourceConnParams dbconfigs.Connector + // TODO(@setassociative, merge resolution) + // sourceConnParams *mysql.ConnParams } type args struct { ctx context.Context @@ -383,8 +389,8 @@ func TestMySQLVStreamerClientClose(t *testing.T) { for _, tcase := range tests { t.Run(tcase.name, func(t *testing.T) { vsClient := &MySQLVStreamerClient{ - isOpen: tcase.fields.isOpen, - sourceConnParams: tcase.fields.sourceConnParams, + isOpen: tcase.fields.isOpen, + sourceCp: tcase.fields.sourceConnParams, } err := vsClient.Open(tcase.args.ctx) @@ -413,6 +419,8 @@ func TestMySQLVStreamerClientClose(t *testing.T) { func TestMySQLVStreamerClientVStream(t *testing.T) { vsClient := &MySQLVStreamerClient{ sourceConnParams: dbcfgs.ExternalReplWithDB(), + // TODO(@setassociative, merge resolution) + // sourceCp: env.Dbcfgs.ExternalReplWithDB(), } filter := &binlogdatapb.Filter{ @@ -473,6 +481,8 @@ func TestMySQLVStreamerClientVStream(t *testing.T) { func TestMySQLVStreamerClientVStreamRows(t *testing.T) { vsClient := &MySQLVStreamerClient{ sourceConnParams: dbcfgs.ExternalReplWithDB(), + // TODO(@setassociative, merge resolution) + // sourceCp: env.Dbcfgs.ExternalReplWithDB(), } eventsChan := make(chan *querypb.Row, 1000) diff --git a/go/vt/vttablet/tabletserver/api.go b/go/vt/vttablet/tabletserver/api.go new file mode 100644 index 00000000000..7615fda7415 --- /dev/null +++ b/go/vt/vttablet/tabletserver/api.go @@ -0,0 +1,81 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tabletserver + +import ( + "fmt" + "net/http" + + "vitess.io/vitess/go/acl" + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" +) + +// This file implements a REST-style API for the vtctld web interface. + +const ( + apiPrefix = "/api/" + + jsonContentType = "application/json; charset=utf-8" +) + +func httpErrorf(w http.ResponseWriter, r *http.Request, format string, args ...interface{}) { + errMsg := fmt.Sprintf(format, args...) + log.Errorf("HTTP error on %v: %v, request: %#v", r.URL.Path, errMsg, r) + http.Error(w, errMsg, http.StatusInternalServerError) +} + +func handleAPI(apiPath string, handlerFunc func(w http.ResponseWriter, r *http.Request) error) { + http.HandleFunc(apiPrefix+apiPath, func(w http.ResponseWriter, r *http.Request) { + defer func() { + if x := recover(); x != nil { + httpErrorf(w, r, "uncaught panic: %v", x) + } + }() + if err := handlerFunc(w, r); err != nil { + httpErrorf(w, r, "%v", err) + } + }) +} + +// InitAPI initializes api for tabletserver +func InitAPI(vrEngine *vreplication.Engine) { + handleAPI("vdiff/start", func(w http.ResponseWriter, r *http.Request) error { + vrEngine.RunVdiff() + return nil + }) + + handleAPI("vdiff/abort", func(w http.ResponseWriter, r *http.Request) error { + vrEngine.AbortVdiff() + return nil + }) + + // Features + handleAPI("vdiff/status", func(w http.ResponseWriter, r *http.Request) error { + if err := acl.CheckAccessHTTP(r, acl.ADMIN); err != nil { + http.Error(w, "403 Forbidden", http.StatusForbidden) + return nil + } + resp, err := vrEngine.VDiffReportStatus() + if err != nil { + return fmt.Errorf("json error: %v", err) + } + w.Header().Set("Content-Type", jsonContentType) + w.Write(resp) + return nil + }) +} diff --git a/go/vt/vttablet/tabletserver/vstreamer/engine.go b/go/vt/vttablet/tabletserver/vstreamer/engine.go index 305b2530f75..35158153691 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/engine.go +++ b/go/vt/vttablet/tabletserver/vstreamer/engine.go @@ -230,7 +230,9 @@ func (vse *Engine) StreamResults(ctx context.Context, query string, send func(*b if !vse.isOpen { return nil, 0, errors.New("VStreamer is not open") } - resultStreamer := newResultStreamer(ctx, vse.env.DBConfigs().AppWithDB(), query, send) + // TODO(@setassociative, merge resolution): + // resultStreamer := NewResultStreamer(ctx, vse.cp, query, send) + resultStreamer := NewResultStreamer(ctx, vse.env.DBConfigs().AppWithDB(), query, send) idx := vse.streamIdx vse.resultStreamers[idx] = resultStreamer vse.streamIdx++ diff --git a/go/vt/vttablet/tabletserver/vstreamer/resultstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/resultstreamer.go index 141b26f3fa4..c39b354689b 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/resultstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/resultstreamer.go @@ -20,8 +20,10 @@ import ( "context" "fmt" + "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/dbconfigs" + "vitess.io/vitess/go/vt/log" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" "vitess.io/vitess/go/vt/sqlparser" ) @@ -40,7 +42,10 @@ type resultStreamer struct { send func(*binlogdatapb.VStreamResultsResponse) error } -func newResultStreamer(ctx context.Context, cp dbconfigs.Connector, query string, send func(*binlogdatapb.VStreamResultsResponse) error) *resultStreamer { +// TODO(@setassociative, merge resolution) +// func NewResultStreamer(ctx context.Context, cp *mysql.ConnParams, query string, send func(*binlogdatapb.VStreamResultsResponse) error) *resultStreamer { +func NewResultStreamer(ctx context.Context, cp dbconfigs.Connector, query string, send func(*binlogdatapb.VStreamResultsResponse) error) *resultStreamer { + // NewResultStreamer creates a new result streamer ctx, cancel := context.WithCancel(ctx) return &resultStreamer{ ctx: ctx, @@ -128,3 +133,46 @@ func (rs *resultStreamer) Stream() error { return nil } + +// TODO(@setassociative, merge resolution): added the following methods; +// startStreaming seems to be uncalled? +func (rs *resultStreamer) startStreaming(conn *mysql.Conn) (string, error) { + lockConn, err := rs.mysqlConnect() + if err != nil { + return "", err + } + // To be safe, always unlock tables, even if lock tables might fail. + defer func() { + _, err := lockConn.ExecuteFetch("unlock tables", 0, false) + if err != nil { + log.Warning("Unlock tables failed: %v", err) + } else { + log.Infof("Tables unlocked", rs.tableName) + } + lockConn.Close() + }() + + if _, err := lockConn.ExecuteFetch(fmt.Sprintf("lock tables %s read", sqlparser.String(rs.tableName)), 0, false); err != nil { + return "", err + } + pos, err := lockConn.MasterPosition() + if err != nil { + return "", err + } + + if err := conn.ExecuteStreamFetch(rs.query); err != nil { + return "", err + } + + return mysql.EncodePosition(pos), nil +} + +func (rs *resultStreamer) mysqlConnect() (*mysql.Conn, error) { + // TODO(@setassociative, merge resolution) + // cp, err := dbconfigs.WithCredentials(rs.cp) + // if err != nil { + // return nil, err + // } + // return mysql.Connect(rs.ctx, cp) + return rs.cp.Connect(rs.ctx) +} diff --git a/go/vt/wrangler/testlib/migrate_served_from_test.go b/go/vt/wrangler/testlib/migrate_served_from_test.go index 22912050631..ff3a38cb9a1 100644 --- a/go/vt/wrangler/testlib/migrate_served_from_test.go +++ b/go/vt/wrangler/testlib/migrate_served_from_test.go @@ -106,7 +106,7 @@ func TestMigrateServedFrom(t *testing.T) { // Override with a fake VREngine after Agent is initialized in action loop. dbClient := binlogplayer.NewMockDBClient(t) dbClientFactory := func() binlogplayer.DBClient { return dbClient } - destMaster.Agent.VREngine = vreplication.NewEngine(ts, "", destMaster.FakeMysqlDaemon, dbClientFactory, dbClient.DBName()) + destMaster.Agent.VREngine = vreplication.NewEngine(ts, "", nil, destMaster.FakeMysqlDaemon, dbClientFactory, dbClient.DBName()) dbClient.ExpectRequest("select * from _vt.vreplication where db_name='db'", &sqltypes.Result{}, nil) if err := destMaster.Agent.VREngine.Open(context.Background()); err != nil { t.Fatal(err) diff --git a/go/vt/wrangler/testlib/migrate_served_types_test.go b/go/vt/wrangler/testlib/migrate_served_types_test.go index c7debf1cc6d..790272ec2a0 100644 --- a/go/vt/wrangler/testlib/migrate_served_types_test.go +++ b/go/vt/wrangler/testlib/migrate_served_types_test.go @@ -153,7 +153,7 @@ func TestMigrateServedTypes(t *testing.T) { // Override with a fake VREngine after Agent is initialized in action loop. dbClient1 := binlogplayer.NewMockDBClient(t) dbClientFactory1 := func() binlogplayer.DBClient { return dbClient1 } - dest1Master.Agent.VREngine = vreplication.NewEngine(ts, "", dest1Master.FakeMysqlDaemon, dbClientFactory1, dbClient1.DBName()) + dest1Master.Agent.VREngine = vreplication.NewEngine(ts, "", nil, dest1Master.FakeMysqlDaemon, dbClientFactory1, dbClient1.DBName()) // select * from _vt.vreplication during Open dbClient1.ExpectRequest("select * from _vt.vreplication where db_name='db'", &sqltypes.Result{}, nil) if err := dest1Master.Agent.VREngine.Open(context.Background()); err != nil { @@ -181,7 +181,7 @@ func TestMigrateServedTypes(t *testing.T) { // Override with a fake VREngine after Agent is initialized in action loop. dbClient2 := binlogplayer.NewMockDBClient(t) dbClientFactory2 := func() binlogplayer.DBClient { return dbClient2 } - dest2Master.Agent.VREngine = vreplication.NewEngine(ts, "", dest2Master.FakeMysqlDaemon, dbClientFactory2, dbClient2.DBName()) + dest2Master.Agent.VREngine = vreplication.NewEngine(ts, "", nil, dest2Master.FakeMysqlDaemon, dbClientFactory2, dbClient2.DBName()) // select * from _vt.vreplication during Open dbClient2.ExpectRequest("select * from _vt.vreplication where db_name='db'", &sqltypes.Result{}, nil) if err := dest2Master.Agent.VREngine.Open(context.Background()); err != nil { @@ -417,7 +417,7 @@ func TestMultiShardMigrateServedTypes(t *testing.T) { // Override with a fake VREngine after Agent is initialized in action loop. dbClient1 := binlogplayer.NewMockDBClient(t) dbClientFactory1 := func() binlogplayer.DBClient { return dbClient1 } - dest1Master.Agent.VREngine = vreplication.NewEngine(ts, "", dest1Master.FakeMysqlDaemon, dbClientFactory1, "db") + dest1Master.Agent.VREngine = vreplication.NewEngine(ts, "", nil, dest1Master.FakeMysqlDaemon, dbClientFactory1, "db") // select * from _vt.vreplication during Open dbClient1.ExpectRequest("select * from _vt.vreplication where db_name='db'", &sqltypes.Result{}, nil) if err := dest1Master.Agent.VREngine.Open(context.Background()); err != nil { @@ -434,7 +434,7 @@ func TestMultiShardMigrateServedTypes(t *testing.T) { // Override with a fake VREngine after Agent is initialized in action loop. dbClient2 := binlogplayer.NewMockDBClient(t) dbClientFactory2 := func() binlogplayer.DBClient { return dbClient2 } - dest2Master.Agent.VREngine = vreplication.NewEngine(ts, "", dest2Master.FakeMysqlDaemon, dbClientFactory2, "db") + dest2Master.Agent.VREngine = vreplication.NewEngine(ts, "", nil, dest2Master.FakeMysqlDaemon, dbClientFactory2, "db") // select * from _vt.vreplication during Open dbClient2.ExpectRequest("select * from _vt.vreplication where db_name='db'", &sqltypes.Result{}, nil) if err := dest2Master.Agent.VREngine.Open(context.Background()); err != nil { @@ -505,7 +505,7 @@ func TestMultiShardMigrateServedTypes(t *testing.T) { // Override with a fake VREngine after Agent is initialized in action loop. dbClient1 = binlogplayer.NewMockDBClient(t) dbClientFactory1 = func() binlogplayer.DBClient { return dbClient1 } - dest3Master.Agent.VREngine = vreplication.NewEngine(ts, "", dest3Master.FakeMysqlDaemon, dbClientFactory1, "db") + dest3Master.Agent.VREngine = vreplication.NewEngine(ts, "", nil, dest3Master.FakeMysqlDaemon, dbClientFactory1, "db") // select * from _vt.vreplication during Open dbClient1.ExpectRequest("select * from _vt.vreplication where db_name='db'", &sqltypes.Result{}, nil) if err := dest3Master.Agent.VREngine.Open(context.Background()); err != nil { @@ -522,7 +522,7 @@ func TestMultiShardMigrateServedTypes(t *testing.T) { // Override with a fake VREngine after Agent is initialized in action loop. dbClient2 = binlogplayer.NewMockDBClient(t) dbClientFactory2 = func() binlogplayer.DBClient { return dbClient2 } - dest4Master.Agent.VREngine = vreplication.NewEngine(ts, "", dest4Master.FakeMysqlDaemon, dbClientFactory2, "db") + dest4Master.Agent.VREngine = vreplication.NewEngine(ts, "", nil, dest4Master.FakeMysqlDaemon, dbClientFactory2, "db") // select * from _vt.vreplication during Open dbClient2.ExpectRequest("select * from _vt.vreplication where db_name='db'", &sqltypes.Result{}, nil) if err := dest4Master.Agent.VREngine.Open(context.Background()); err != nil { diff --git a/go/vt/wrangler/traffic_switcher_env_test.go b/go/vt/wrangler/traffic_switcher_env_test.go index 5a1a2c47756..17697d21ee5 100644 --- a/go/vt/wrangler/traffic_switcher_env_test.go +++ b/go/vt/wrangler/traffic_switcher_env_test.go @@ -336,7 +336,7 @@ func (tme *testMigraterEnv) createDBClients(ctx context.Context, t *testing.T) { dbClientFactory := func() binlogplayer.DBClient { return dbclient } // Replace existing engine with a new one master.Agent.VREngine.Close() - master.Agent.VREngine = vreplication.NewEngine(tme.ts, "", master.FakeMysqlDaemon, dbClientFactory, dbclient.DBName()) + master.Agent.VREngine = vreplication.NewEngine(tme.ts, "", nil, master.FakeMysqlDaemon, dbClientFactory, dbclient.DBName()) if err := master.Agent.VREngine.Open(ctx); err != nil { t.Fatal(err) } @@ -347,7 +347,7 @@ func (tme *testMigraterEnv) createDBClients(ctx context.Context, t *testing.T) { dbClientFactory := func() binlogplayer.DBClient { return dbclient } // Replace existing engine with a new one master.Agent.VREngine.Close() - master.Agent.VREngine = vreplication.NewEngine(tme.ts, "", master.FakeMysqlDaemon, dbClientFactory, dbclient.DBName()) + master.Agent.VREngine = vreplication.NewEngine(tme.ts, "", nil, master.FakeMysqlDaemon, dbClientFactory, dbclient.DBName()) if err := master.Agent.VREngine.Open(ctx); err != nil { t.Fatal(err) }