From c2c8dabf1dd6f0175e9b89963cea2696984c8e21 Mon Sep 17 00:00:00 2001 From: Rafael Chacon Date: Wed, 11 Dec 2019 09:22:20 +0000 Subject: [PATCH 1/6] VDiff ad-hoc version for slack --- go/cmd/vttablet/vttablet.go | 2 + go/mysql/flavor_filepos.go | 23 +- go/vt/vttablet/tabletmanager/action_agent.go | 6 +- .../tabletmanager/init_tablet_test.go | 2 +- .../tabletmanager/vreplication/controller.go | 52 +- .../tabletmanager/vreplication/engine.go | 74 +- .../tabletmanager/vreplication/engine_test.go | 16 +- .../vreplication/framework_test.go | 2 +- .../tabletmanager/vreplication/vdiff.go | 817 ++++++++++++++++++ .../vreplication/vstreamer_client.go | 150 +++- .../vreplication/vstreamer_client_test.go | 12 +- go/vt/vttablet/tabletserver/api.go | 131 +++ .../vttablet/tabletserver/vstreamer/engine.go | 2 +- .../tabletserver/vstreamer/resultstreamer.go | 8 +- go/vt/wrangler/migrater_env_test.go | 4 +- .../testlib/migrate_served_from_test.go | 2 +- .../testlib/migrate_served_types_test.go | 12 +- 17 files changed, 1255 insertions(+), 60 deletions(-) create mode 100644 go/vt/vttablet/tabletmanager/vreplication/vdiff.go create mode 100644 go/vt/vttablet/tabletserver/api.go diff --git a/go/cmd/vttablet/vttablet.go b/go/cmd/vttablet/vttablet.go index c21f22ca388..899bc769afc 100644 --- a/go/cmd/vttablet/vttablet.go +++ b/go/cmd/vttablet/vttablet.go @@ -130,6 +130,8 @@ func main() { log.Exitf("NewActionAgent() failed: %v", err) } + tabletserver.InitAPI(agent.VREngine) + servenv.OnClose(func() { // stop the agent so that our topo entry gets pruned properly agent.Close() diff --git a/go/mysql/flavor_filepos.go b/go/mysql/flavor_filepos.go index fe1f0e54a83..15d0fc572e8 100644 --- a/go/mysql/flavor_filepos.go +++ b/go/mysql/flavor_filepos.go @@ -39,35 +39,20 @@ func newFilePosFlavor() flavor { // masterGTIDSet is part of the Flavor interface. func (flv *filePosFlavor) masterGTIDSet(c *Conn) (GTIDSet, error) { - qr, err := c.ExecuteFetch("SHOW SLAVE STATUS", 100, true /* wantfields */) + qr, err := c.ExecuteFetch("SHOW MASTER STATUS", 100, true /* wantfields */) if err != nil { return nil, err } if len(qr.Rows) == 0 { - qr, err = c.ExecuteFetch("SHOW MASTER STATUS", 100, true /* wantfields */) - if err != nil { - return nil, err - } - if len(qr.Rows) == 0 { - return nil, errors.New("no master or slave status") - } - resultMap, err := resultToMap(qr) - if err != nil { - return nil, err - } - return filePosGTID{ - file: resultMap["File"], - pos: resultMap["Position"], - }, nil + return nil, errors.New("no master or slave status") } - resultMap, err := resultToMap(qr) if err != nil { return nil, err } return filePosGTID{ - file: resultMap["Relay_Master_Log_File"], - pos: resultMap["Exec_Master_Log_Pos"], + file: resultMap["File"], + pos: resultMap["Position"], }, nil } diff --git a/go/vt/vttablet/tabletmanager/action_agent.go b/go/vt/vttablet/tabletmanager/action_agent.go index 675176cf2a1..453854361cc 100644 --- a/go/vt/vttablet/tabletmanager/action_agent.go +++ b/go/vt/vttablet/tabletmanager/action_agent.go @@ -311,7 +311,7 @@ func NewActionAgent( vreplication.InitVStreamerClient(agent.DBConfigs) // The db name is set by the Start function called above - agent.VREngine = vreplication.NewEngine(ts, tabletAlias.Cell, mysqld, func() binlogplayer.DBClient { + agent.VREngine = vreplication.NewEngine(ts, tabletAlias.GetCell(), agent.Tablet(), mysqld, func() binlogplayer.DBClient { return binlogplayer.NewDBClient(agent.DBConfigs.FilteredWithDB()) }, agent.DBConfigs.FilteredWithDB().DbName, @@ -382,7 +382,7 @@ func NewTestActionAgent(batchCtx context.Context, ts *topo.Server, tabletAlias * Cnf: nil, MysqlDaemon: mysqlDaemon, DBConfigs: &dbconfigs.DBConfigs{}, - VREngine: vreplication.NewEngine(ts, tabletAlias.Cell, mysqlDaemon, binlogplayer.NewFakeDBClient, ti.DbName()), + VREngine: vreplication.NewEngine(ts, tabletAlias.GetCell(), nil, mysqlDaemon, binlogplayer.NewFakeDBClient, ti.DbName()), History: history.New(historyLength), _healthy: fmt.Errorf("healthcheck not run yet"), } @@ -421,7 +421,7 @@ func NewComboActionAgent(batchCtx context.Context, ts *topo.Server, tabletAlias Cnf: nil, MysqlDaemon: mysqlDaemon, DBConfigs: dbcfgs, - VREngine: vreplication.NewEngine(nil, "", nil, nil, ""), + VREngine: vreplication.NewEngine(nil, "", nil, nil, nil, ""), gotMysqlPort: true, History: history.New(historyLength), _healthy: fmt.Errorf("healthcheck not run yet"), diff --git a/go/vt/vttablet/tabletmanager/init_tablet_test.go b/go/vt/vttablet/tabletmanager/init_tablet_test.go index cac9c0a0673..597d859ee47 100644 --- a/go/vt/vttablet/tabletmanager/init_tablet_test.go +++ b/go/vt/vttablet/tabletmanager/init_tablet_test.go @@ -194,7 +194,7 @@ func TestInitTablet(t *testing.T) { TabletAlias: tabletAlias, MysqlDaemon: mysqlDaemon, DBConfigs: &dbconfigs.DBConfigs{}, - VREngine: vreplication.NewEngine(nil, "", nil, nil, ""), + VREngine: vreplication.NewEngine(nil, "", nil, nil, nil, ""), batchCtx: ctx, History: history.New(historyLength), _healthy: fmt.Errorf("healthcheck not run yet"), diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller.go b/go/vt/vttablet/tabletmanager/vreplication/controller.go index 09d9fc29c3d..c038d9f5819 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller.go @@ -20,6 +20,7 @@ import ( "flag" "fmt" "strconv" + "sync" "time" "vitess.io/vitess/go/vt/discovery" @@ -44,6 +45,7 @@ var ( healthcheckRetryDelay = flag.Duration("vreplication_healthcheck_retry_delay", 5*time.Second, "healthcheck retry delay") healthcheckTimeout = flag.Duration("vreplication_healthcheck_timeout", 1*time.Minute, "healthcheck retry delay") retryDelay = flag.Duration("vreplication_retry_delay", 5*time.Second, "delay before retrying a failed binlog connection") + onlyOnceVdiff sync.Once ) // controller is created by Engine. Members are initialized upfront. @@ -140,6 +142,7 @@ func (ct *controller) run(ctx context.Context) { if err == nil { return } + // Sometimes, canceled contexts get wrapped as errors. select { case <-ctx.Done(): @@ -157,6 +160,53 @@ func (ct *controller) run(ctx context.Context) { } } +func (ct *controller) runVDiff(ctx context.Context) (err error) { + defer func() { + ct.sourceTablet.Set("") + if x := recover(); x != nil { + log.Errorf("stream %v: caught panic: %v\n%s", ct.id, x, tb.Stack(4)) + err = fmt.Errorf("panic: %v", x) + } + }() + + select { + case <-ctx.Done(): + return nil + default: + } + + dbClient := ct.dbClientFactory() + if err := dbClient.Connect(); err != nil { + return vterrors.Wrap(err, "can't connect to database") + } + defer dbClient.Close() + + var tablet *topodatapb.Tablet + if ct.source.GetExternalMysql() == "" { + log.Infof("trying to find a tablet eligible for vreplication. stream id: %v", ct.id) + tablet, err = ct.tabletPicker.PickForStreaming(ctx) + if err != nil { + return err + } + log.Infof("found a tablet eligible for vreplication. stream id: %v tablet: %s", ct.id, tablet.Alias.String()) + ct.sourceTablet.Set(tablet.Alias.String()) + } + + switch { + case ct.source.Filter != nil: + var vsClient VStreamerClient + if ct.source.GetExternalMysql() == "" { + vsClient = NewTabletVStreamerClient(tablet, ct.mysqld) + } else { + vsClient = NewMySQLVStreamerClient() + } + + vd := newVDiffer(ct.id, &ct.source, vsClient, ct.blpStats, dbClient, ct.vre, ct.workflow) + return vd.VDiff(ctx, 60*time.Second) + } + return fmt.Errorf("missing source") +} + func (ct *controller) runBlp(ctx context.Context) (err error) { defer func() { ct.sourceTablet.Set("") @@ -222,7 +272,7 @@ func (ct *controller) runBlp(ctx context.Context) (err error) { var vsClient VStreamerClient if ct.source.GetExternalMysql() == "" { - vsClient = NewTabletVStreamerClient(tablet) + vsClient = NewTabletVStreamerClient(tablet, nil) } else { vsClient = NewMySQLVStreamerClient() } diff --git a/go/vt/vttablet/tabletmanager/vreplication/engine.go b/go/vt/vttablet/tabletmanager/vreplication/engine.go index a9c1678fde3..87bebd3c648 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/engine.go +++ b/go/vt/vttablet/tabletmanager/vreplication/engine.go @@ -17,6 +17,7 @@ limitations under the License. package vreplication import ( + "encoding/json" "errors" "flag" "fmt" @@ -29,9 +30,11 @@ import ( "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/mysqlctl" + "vitess.io/vitess/go/vt/topo" + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" querypb "vitess.io/vitess/go/vt/proto/query" - "vitess.io/vitess/go/vt/topo" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) const ( @@ -79,12 +82,19 @@ type Engine struct { cancel context.CancelFunc ts *topo.Server + tablet *topodatapb.Tablet cell string mysqld mysqlctl.MysqlDaemon dbClientFactory func() binlogplayer.DBClient dbName string journaler map[string]*journalEvent + + // VDiff Hack + isVDiffRunning bool + vdiffCancel context.CancelFunc + vdiffMu sync.Mutex + vdiffError error } type journalEvent struct { @@ -94,10 +104,11 @@ type journalEvent struct { // NewEngine creates a new Engine. // A nil ts means that the Engine is disabled. -func NewEngine(ts *topo.Server, cell string, mysqld mysqlctl.MysqlDaemon, dbClientFactory func() binlogplayer.DBClient, dbName string) *Engine { +func NewEngine(ts *topo.Server, cell string, tablet *topodatapb.Tablet, mysqld mysqlctl.MysqlDaemon, dbClientFactory func() binlogplayer.DBClient, dbName string) *Engine { vre := &Engine{ controllers: make(map[int]*controller), ts: ts, + tablet: tablet, cell: cell, mysqld: mysqld, dbClientFactory: dbClientFactory, @@ -212,6 +223,65 @@ func (vre *Engine) IsOpen() bool { return vre.isOpen } +func (vre *Engine) RunVdiff() { + vre.vdiffMu.Lock() + defer vre.vdiffMu.Unlock() + if vre.isVDiffRunning { + return + } + vre.isVDiffRunning = true + vre.vdiffError = nil + var ctx context.Context + ctx, vre.vdiffCancel = context.WithCancel(context.Background()) + go func() { + if len(vre.controllers) != 1 { + log.Infof("There are no replication streams, nothing to diff") + return + } + // This is using the vreplication id that stars at 1 + ctrl := vre.controllers[1] + if ctrl == nil { + log.Infof("VReplication ctrl is nil, this shouldn't happen") + } + diffError := ctrl.runVDiff(ctx) + vre.vdiffMu.Lock() + defer vre.vdiffMu.Unlock() + vre.vdiffError = diffError + vre.isVDiffRunning = false + }() + return +} + +func (vre *Engine) AbortVdiff() { + vre.vdiffMu.Lock() + defer vre.vdiffMu.Unlock() + if !vre.isVDiffRunning { + return + } + vre.isVDiffRunning = false + vre.vdiffCancel() + return +} + +func (vre *Engine) VDiffReportStatus() ([]byte, error) { + vre.vdiffMu.Lock() + defer vre.vdiffMu.Unlock() + resp := make(map[string]interface{}) + if vre.isVDiffRunning { + resp["status"] = "running" + } else { + resp["status"] = "not_running" + } + if vre.vdiffError != nil { + resp["last_error"] = vre.vdiffError.Error() + return json.MarshalIndent(resp, "", " ") + } + if VDiffStatus() != nil { + resp["last_diff"] = VDiffStatus() + } + return json.MarshalIndent(resp, "", " ") +} + // Close closes the Engine service. func (vre *Engine) Close() { vre.mu.Lock() diff --git a/go/vt/vttablet/tabletmanager/vreplication/engine_test.go b/go/vt/vttablet/tabletmanager/vreplication/engine_test.go index 6377be170a1..814eea20e3a 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/engine_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/engine_test.go @@ -41,7 +41,7 @@ func TestEngineOpen(t *testing.T) { // Test Insert - vre := NewEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, dbClient.DBName()) + vre := NewEngine(env.TopoServ, env.Cells[0], nil, mysqld, dbClientFactory, dbClient.DBName()) if vre.IsOpen() { t.Errorf("IsOpen: %v, want false", vre.IsOpen()) } @@ -89,7 +89,7 @@ func TestEngineExec(t *testing.T) { // Test Insert - vre := NewEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, dbClient.DBName()) + vre := NewEngine(env.TopoServ, env.Cells[0], nil, mysqld, dbClientFactory, dbClient.DBName()) dbClient.ExpectRequest("select * from _vt.vreplication where db_name='db'", &sqltypes.Result{}, nil) if err := vre.Open(context.Background()); err != nil { @@ -249,7 +249,7 @@ func TestEngineBadInsert(t *testing.T) { dbClientFactory := func() binlogplayer.DBClient { return dbClient } mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: 3306} - vre := NewEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, dbClient.DBName()) + vre := NewEngine(env.TopoServ, env.Cells[0], nil, mysqld, dbClientFactory, dbClient.DBName()) dbClient.ExpectRequest("select * from _vt.vreplication where db_name='db'", &sqltypes.Result{}, nil) if err := vre.Open(context.Background()); err != nil { @@ -279,7 +279,7 @@ func TestEngineSelect(t *testing.T) { dbClientFactory := func() binlogplayer.DBClient { return dbClient } mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: 3306} - vre := NewEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, dbClient.DBName()) + vre := NewEngine(env.TopoServ, env.Cells[0], nil, mysqld, dbClientFactory, dbClient.DBName()) dbClient.ExpectRequest("select * from _vt.vreplication where db_name='db'", &sqltypes.Result{}, nil) if err := vre.Open(context.Background()); err != nil { @@ -314,7 +314,7 @@ func TestWaitForPos(t *testing.T) { dbClient := binlogplayer.NewMockDBClient(t) mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: 3306} dbClientFactory := func() binlogplayer.DBClient { return dbClient } - vre := NewEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, dbClient.DBName()) + vre := NewEngine(env.TopoServ, env.Cells[0], nil, mysqld, dbClientFactory, dbClient.DBName()) dbClient.ExpectRequest("select * from _vt.vreplication where db_name='db'", &sqltypes.Result{}, nil) if err := vre.Open(context.Background()); err != nil { @@ -344,7 +344,7 @@ func TestWaitForPosError(t *testing.T) { dbClient := binlogplayer.NewMockDBClient(t) mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: 3306} dbClientFactory := func() binlogplayer.DBClient { return dbClient } - vre := NewEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, dbClient.DBName()) + vre := NewEngine(env.TopoServ, env.Cells[0], nil, mysqld, dbClientFactory, dbClient.DBName()) err := vre.WaitForPos(context.Background(), 1, "MariaDB/0-1-1084") want := `vreplication engine is closed` @@ -386,7 +386,7 @@ func TestWaitForPosCancel(t *testing.T) { dbClient := binlogplayer.NewMockDBClient(t) mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: 3306} dbClientFactory := func() binlogplayer.DBClient { return dbClient } - vre := NewEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, dbClient.DBName()) + vre := NewEngine(env.TopoServ, env.Cells[0], nil, mysqld, dbClientFactory, dbClient.DBName()) dbClient.ExpectRequest("select * from _vt.vreplication where db_name='db'", &sqltypes.Result{}, nil) if err := vre.Open(context.Background()); err != nil { @@ -434,7 +434,7 @@ func TestCreateDBAndTable(t *testing.T) { // Test Insert - vre := NewEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, dbClient.DBName()) + vre := NewEngine(env.TopoServ, env.Cells[0], nil, mysqld, dbClientFactory, dbClient.DBName()) tableNotFound := mysql.SQLError{Num: 1146, Message: "table not found"} dbClient.ExpectRequest("select * from _vt.vreplication where db_name='db'", nil, &tableNotFound) diff --git a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go index 24853187a62..75486ab5d39 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go @@ -99,7 +99,7 @@ func TestMain(m *testing.M) { InitVStreamerClient(env.Dbcfgs) - playerEngine = NewEngine(env.TopoServ, env.Cells[0], env.Mysqld, realDBClientFactory, vrepldb) + playerEngine = NewEngine(env.TopoServ, env.Cells[0], nil, env.Mysqld, realDBClientFactory, vrepldb) if err := playerEngine.Open(context.Background()); err != nil { fmt.Fprintf(os.Stderr, "%v", err) return 1 diff --git a/go/vt/vttablet/tabletmanager/vreplication/vdiff.go b/go/vt/vttablet/tabletmanager/vreplication/vdiff.go new file mode 100644 index 00000000000..d0b5d0c2a54 --- /dev/null +++ b/go/vt/vttablet/tabletmanager/vreplication/vdiff.go @@ -0,0 +1,817 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vreplication + +import ( + "fmt" + "strings" + "sync" + "time" + + "github.com/golang/protobuf/proto" + "golang.org/x/net/context" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/binlog/binlogplayer" + "vitess.io/vitess/go/vt/key" + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/srvtopo" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/topoproto" + "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vtgate/engine" + "vitess.io/vitess/go/vt/vttablet/tmclient" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + querypb "vitess.io/vitess/go/vt/proto/query" + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" +) + +// vdiff provides the core logic to start vreplication streams +type vdiff struct { + id uint32 + dbClient *vdbClient + // source + source *binlogdatapb.BinlogSource + sourceVStreamer VStreamerClient + targetVStreamer VStreamerClient + + stats *binlogplayer.Stats + vre *Engine + + differs map[string]*tableDiffer + sourceDf *dfParams + targetDf *dfParams + + tmc tmclient.TabletManagerClient + workflow string + + reportMu sync.Mutex + totalSummary DiffReport + diffReports map[string]*DiffReport +} + +// newVDiffer creates a new vreplicator +func newVDiffer(id uint32, source *binlogdatapb.BinlogSource, sourceVStreamer VStreamerClient, stats *binlogplayer.Stats, dbClient binlogplayer.DBClient, vre *Engine, workflow string) *vdiff { + return &vdiff{ + id: id, + source: source, + sourceVStreamer: sourceVStreamer, + stats: stats, + dbClient: newVDBClient(dbClient, stats), + vre: vre, + tmc: tmclient.NewTabletManagerClient(), + workflow: workflow, + diffReports: make(map[string]*DiffReport), + } +} + +type DatabaseReport struct { + GlobalSummary DiffReport `json:"total_summary,omitempty"` + TablesReport map[string]*DiffReport `json:"table_diffs,omitempty"` +} + +// DiffReport is the summary of differences for one table. +type DiffReport struct { + ProcessedRows int + MatchingRows int + MismatchedRows int + ExtraRowsSource int + ExtraRowsTarget int +} + +type tableDiffer struct { + targetTable string + sourceExpression string + targetExpression string + compareCols []int + comparePKs []int + comparePKNames []string + sourcePrimitive engine.Primitive + targetPrimitive engine.Primitive +} + +type dfParams struct { + master *topo.TabletInfo + vstreamer VStreamerClient + position mysql.Position + snapshotPosition string + result chan *sqltypes.Result + err error +} + +var currentDatabaseReport *DatabaseReport + +// Replicate starts a vreplication stream. +func (df *vdiff) VDiff(ctx context.Context, filteredReplicationWaitTime time.Duration) error { + df.targetVStreamer = NewTabletVStreamerClient(df.vre.tablet, df.vre.mysqld) + + df.sourceVStreamer.Open(ctx) + df.targetVStreamer.Open(ctx) + defer func() { + df.sourceVStreamer.Close(context.Background()) + df.targetVStreamer.Close(context.Background()) + }() + + tablet, err := df.vre.ts.GetTablet(ctx, df.vre.tablet.GetAlias()) + if err != nil { + return err + } + + targetShard, err := df.vre.ts.GetShard(ctx, tablet.GetKeyspace(), tablet.GetShard()) + if err != nil { + return err + } + + targetMaster, err := df.vre.ts.GetTablet(ctx, targetShard.MasterAlias) + if err != nil { + return err + } + + df.sourceDf = &dfParams{ + vstreamer: df.sourceVStreamer, + } + + df.targetDf = &dfParams{ + master: targetMaster, + vstreamer: df.targetVStreamer, + } + + schm, err := df.getSchema(ctx, df.vre.tablet.GetAlias(), nil, nil, false) + + if err != nil { + return vterrors.Wrap(err, "GetSchema") + } + + df.differs, err = buildVDiffPlan(ctx, df.source.Filter, schm) + if err != nil { + return err + } + + defer func() { + if err := df.restartTarget(context.Background()); err != nil { + log.Error("Could not restart workflow %s: %v, please restart it manually", err) + } + }() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + for table, td := range df.differs { + if err := df.stopTarget(ctx); err != nil { + return vterrors.Wrap(err, "stopTargets") + } + sourceReader, err := df.startQueryStreams(ctx, df.sourceDf, td.sourceExpression, filteredReplicationWaitTime) + if err != nil { + return vterrors.Wrap(err, "startQueryStreams(sources)") + } + if err := df.syncTargets(ctx, filteredReplicationWaitTime); err != nil { + return vterrors.Wrap(err, "syncTargets") + } + targetReader, err := df.startQueryStreams(ctx, df.targetDf, td.targetExpression, filteredReplicationWaitTime) + if err != nil { + return vterrors.Wrap(err, "startQueryStreams(targets)") + } + if err := df.restartTarget(ctx); err != nil { + return vterrors.Wrap(err, "restartTarget") + } + dr, err := td.diff(ctx, sourceReader, targetReader) + if err != nil { + return vterrors.Wrap(err, "diff") + } + log.Infof("Summary for %v: %+v\n", td.targetTable, *dr) + func() { + df.reportMu.Lock() + defer df.reportMu.Unlock() + df.totalSummary.MatchingRows += dr.MatchingRows + df.totalSummary.ProcessedRows += dr.ProcessedRows + df.totalSummary.MismatchedRows += dr.MismatchedRows + df.totalSummary.ExtraRowsSource += dr.ExtraRowsSource + df.totalSummary.ExtraRowsTarget += dr.ExtraRowsTarget + df.diffReports[table] = dr + currentDatabaseReport = &DatabaseReport{ + GlobalSummary: df.totalSummary, + TablesReport: df.diffReports, + } + }() + } + log.Infof("Total Diffs: processed: %v, matched: %v, extra_rows_source: %v, extra_rows_target: %v, mistmatched_rows: %v", df.totalSummary.ProcessedRows, df.totalSummary.MatchingRows, df.totalSummary.ExtraRowsSource, df.totalSummary.ExtraRowsTarget, df.totalSummary.MismatchedRows) + return nil +} + +func VDiffStatus() *DatabaseReport { + return currentDatabaseReport +} + +func buildVDiffPlan(ctx context.Context, filter *binlogdatapb.Filter, schm *tabletmanagerdatapb.SchemaDefinition) (map[string]*tableDiffer, error) { + differs := make(map[string]*tableDiffer) + for _, table := range schm.TableDefinitions { + rule, err := MatchTable(table.Name, filter) + if err != nil { + return nil, err + } + if rule == nil { + continue + } + query := rule.Filter + if rule.Filter == "" || key.IsKeyRange(rule.Filter) { + buf := sqlparser.NewTrackedBuffer(nil) + buf.Myprintf("select * from %v", sqlparser.NewTableIdent(table.Name)) + query = buf.String() + } + differs[table.Name], err = buildDifferPlan(table, query) + if err != nil { + return nil, err + } + } + return differs, nil +} + +func buildDifferPlan(table *tabletmanagerdatapb.TableDefinition, query string) (*tableDiffer, error) { + statement, err := sqlparser.Parse(query) + if err != nil { + return nil, err + } + sel, ok := statement.(*sqlparser.Select) + if !ok { + return nil, fmt.Errorf("unexpected: %v", sqlparser.String(statement)) + } + td := &tableDiffer{ + targetTable: table.Name, + } + sourceSelect := &sqlparser.Select{} + targetSelect := &sqlparser.Select{} + var aggregates []engine.AggregateParams + for _, selExpr := range sel.SelectExprs { + switch selExpr := selExpr.(type) { + case *sqlparser.StarExpr: + for _, fld := range table.Fields { + aliased := &sqlparser.AliasedExpr{Expr: &sqlparser.ColName{Name: sqlparser.NewColIdent(fld.Name)}} + sourceSelect.SelectExprs = append(sourceSelect.SelectExprs, aliased) + targetSelect.SelectExprs = append(targetSelect.SelectExprs, aliased) + } + case *sqlparser.AliasedExpr: + var targetCol *sqlparser.ColName + if !selExpr.As.IsEmpty() { + targetCol = &sqlparser.ColName{Name: selExpr.As} + } else { + if colAs, ok := selExpr.Expr.(*sqlparser.ColName); ok { + targetCol = colAs + } else { + return nil, fmt.Errorf("expression needs an alias: %v", sqlparser.String(selExpr)) + } + } + sourceSelect.SelectExprs = append(sourceSelect.SelectExprs, selExpr) + targetSelect.SelectExprs = append(targetSelect.SelectExprs, &sqlparser.AliasedExpr{Expr: targetCol}) + + // Check if it's an aggregate expression + if expr, ok := selExpr.Expr.(*sqlparser.FuncExpr); ok { + switch fname := expr.Name.Lowered(); fname { + case "count", "sum": + aggregates = append(aggregates, engine.AggregateParams{ + Opcode: engine.SupportedAggregates[fname], + Col: len(sourceSelect.SelectExprs) - 1, + }) + } + } + default: + return nil, fmt.Errorf("unexpected: %v", sqlparser.String(statement)) + } + } + fields := make(map[string]querypb.Type) + for _, field := range table.Fields { + fields[strings.ToLower(field.Name)] = field.Type + } + + td.compareCols = make([]int, len(sourceSelect.SelectExprs)) + for i := range td.compareCols { + colname := targetSelect.SelectExprs[i].(*sqlparser.AliasedExpr).Expr.(*sqlparser.ColName).Name.Lowered() + typ, ok := fields[colname] + if !ok { + return nil, fmt.Errorf("column %v not found in table %v", colname, table.Name) + } + td.compareCols[i] = i + if sqltypes.IsText(typ) { + sourceSelect.SelectExprs = append(sourceSelect.SelectExprs, wrapWeightString(sourceSelect.SelectExprs[i])) + targetSelect.SelectExprs = append(targetSelect.SelectExprs, wrapWeightString(targetSelect.SelectExprs[i])) + td.compareCols[i] = len(sourceSelect.SelectExprs) - 1 + } + } + + sourceSelect.From = sel.From + targetSelect.From = sqlparser.TableExprs{ + &sqlparser.AliasedTableExpr{ + Expr: &sqlparser.TableName{ + Name: sqlparser.NewTableIdent(table.Name), + }, + }, + } + + var orderby sqlparser.OrderBy + for _, pk := range table.PrimaryKeyColumns { + found := false + for i, selExpr := range targetSelect.SelectExprs { + colname := selExpr.(*sqlparser.AliasedExpr).Expr.(*sqlparser.ColName).Name.Lowered() + if pk == colname { + td.comparePKs = append(td.comparePKs, td.compareCols[i]) + td.comparePKNames = append(td.comparePKNames, colname) + // We'll be comparing pks seperately. So, remove them from compareCols. + td.compareCols[i] = -1 + found = true + break + } + } + if !found { + // Unreachable. + return nil, fmt.Errorf("column %v not found in table %v", pk, table.Name) + } + orderby = append(orderby, &sqlparser.Order{ + Expr: &sqlparser.ColName{Name: sqlparser.NewColIdent(pk)}, + Direction: sqlparser.AscScr, + }) + } + sourceSelect.Where = removeKeyrange(sel.Where) + sourceSelect.GroupBy = sel.GroupBy + sourceSelect.OrderBy = orderby + + targetSelect.OrderBy = orderby + + td.sourceExpression = sqlparser.String(sourceSelect) + td.targetExpression = sqlparser.String(targetSelect) + + td.sourcePrimitive = newMergeSorter(td.comparePKs) + td.targetPrimitive = newMergeSorter(td.comparePKs) + if len(aggregates) != 0 { + td.sourcePrimitive = &engine.OrderedAggregate{ + Aggregates: aggregates, + Keys: td.comparePKs, + Input: td.sourcePrimitive, + } + } + + return td, nil +} + +func (df *vdiff) startQueryStreams(ctx context.Context, participant *dfParams, query string, filteredReplicationWaitTime time.Duration) (*resultReader, error) { + waitCtx, cancel := context.WithTimeout(ctx, filteredReplicationWaitTime) + defer cancel() + // Iteration for each participant. + + if err := participant.vstreamer.WaitForPosition(waitCtx, mysql.EncodePosition(participant.position)); err != nil { + return nil, vterrors.Wrapf(err, "WaitForPosition for tablet %v", participant.vstreamer) + } + participant.result = make(chan *sqltypes.Result, 1) + gtidch := make(chan string, 1) + + // Start the stream in a separate goroutine. + go df.streamOne(ctx, participant, query, gtidch) + + // Wait for the gtid to be sent. If it's not received, there was an error + // which would be stored in participant.err. + gtid, ok := <-gtidch + if !ok { + return nil, participant.err + } + // Save the new position, as of when the query executed. + participant.snapshotPosition = gtid + return newResultReader(ctx, participant), nil +} + +// streamOne is called as a goroutine, and communicates its results through channels. +// It first sends the snapshot gtid to gtidch. +// Then it streams results to participant.result. +// Before returning, it sets participant.err, and closes all channels. +// If any channel is closed, then participant.err can be checked if there was an error. +func (df *vdiff) streamOne(ctx context.Context, participant *dfParams, query string, gtidch chan string) { + defer close(participant.result) + defer close(gtidch) + + // Wrap the streaming in a separate function so we can capture the error. + // This shows that the error will be set before the channels are closed. + participant.err = func() error { + var fields []*querypb.Field + err := participant.vstreamer.VStreamResults(ctx, query, func(vrs *binlogdatapb.VStreamResultsResponse) error { + if vrs.Fields != nil { + fields = vrs.Fields + gtidch <- vrs.Gtid + } + p3qr := &querypb.QueryResult{ + Fields: fields, + Rows: vrs.Rows, + } + result := sqltypes.Proto3ToResult(p3qr) + // Fields should be received only once, and sent only once. + if vrs.Fields == nil { + result.Fields = nil + } + select { + case participant.result <- result: + case <-ctx.Done(): + return vterrors.Wrap(ctx.Err(), "VStreamResults") + } + return nil + }) + return err + }() +} + +func (df *vdiff) syncTargets(ctx context.Context, filteredReplicationWaitTime time.Duration) error { + waitCtx, cancel := context.WithTimeout(ctx, filteredReplicationWaitTime) + defer cancel() + pos := df.sourceDf.snapshotPosition + query := fmt.Sprintf("update _vt.vreplication set state='Running', stop_pos='%s', message='synchronizing for vdiff' where id=%d", pos, df.id) + if _, err := df.tmc.VReplicationExec(ctx, df.targetDf.master.Tablet, query); err != nil { + return err + } + + log.Infof("VReplication stopped") + if err := df.tmc.VReplicationWaitForPos(waitCtx, df.targetDf.master.Tablet, int(df.id), pos); err != nil { + return vterrors.Wrapf(err, "VReplicationWaitForPos for tablet %v", topoproto.TabletAliasString(df.targetDf.master.Tablet.Alias)) + } + + pos, err := df.tmc.MasterPosition(ctx, df.targetDf.master.Tablet) + if err != nil { + return err + } + mpos, err := mysql.DecodePosition(pos) + if err != nil { + return err + } + df.targetDf.position = mpos + return nil +} + +func (df *vdiff) restartTarget(ctx context.Context) error { + query := fmt.Sprintf("update _vt.vreplication set state='Running', message='', stop_pos='' where db_name=%s and workflow=%s", encodeString(df.targetDf.master.DbName()), encodeString(df.workflow)) + _, err := df.tmc.VReplicationExec(ctx, df.targetDf.master.Tablet, query) + return err +} + +func (df *vdiff) getSchema(ctx context.Context, tabletAlias *topodatapb.TabletAlias, tables, excludeTables []string, includeViews bool) (*tabletmanagerdatapb.SchemaDefinition, error) { + ti, err := df.vre.ts.GetTablet(ctx, tabletAlias) + if err != nil { + return nil, fmt.Errorf("GetTablet(%v) failed: %v", tabletAlias, err) + } + + return df.tmc.GetSchema(ctx, ti.Tablet, tables, excludeTables, includeViews) +} + +//----------------------------------------------------------------- +// primitiveExecutor + +type primitiveExecutor struct { + prim engine.Primitive + rows [][]sqltypes.Value + resultch chan *sqltypes.Result + err error +} + +func newPrimitiveExecutor(ctx context.Context, vcursor engine.VCursor, prim engine.Primitive) *primitiveExecutor { + pe := &primitiveExecutor{ + prim: prim, + resultch: make(chan *sqltypes.Result, 1), + } + go func() { + defer close(pe.resultch) + pe.err = pe.prim.StreamExecute(vcursor, make(map[string]*querypb.BindVariable), false, func(qr *sqltypes.Result) error { + select { + case pe.resultch <- qr: + case <-ctx.Done(): + return vterrors.Wrap(ctx.Err(), "Outer Stream") + } + return nil + }) + }() + return pe +} + +func (pe *primitiveExecutor) next() ([]sqltypes.Value, error) { + for len(pe.rows) == 0 { + qr, ok := <-pe.resultch + if !ok { + return nil, pe.err + } + pe.rows = qr.Rows + } + + row := pe.rows[0] + pe.rows = pe.rows[1:] + return row, nil +} + +func (pe *primitiveExecutor) drain(ctx context.Context) (int, error) { + count := 0 + for { + row, err := pe.next() + if err != nil { + return 0, err + } + if row == nil { + return count, nil + } + count++ + } +} + +//----------------------------------------------------------------- +// mergeSorter + +var _ engine.Primitive = (*mergeSorter)(nil) + +// mergeSorter performs a merge-sorted read from the participants. +type mergeSorter struct { + engine.Primitive + orderBy []engine.OrderbyParams +} + +func newMergeSorter(comparePKs []int) *mergeSorter { + ob := make([]engine.OrderbyParams, 0, len(comparePKs)) + for _, col := range comparePKs { + ob = append(ob, engine.OrderbyParams{Col: col}) + } + return &mergeSorter{ + orderBy: ob, + } +} + +func (ms *mergeSorter) StreamExecute(vcursor engine.VCursor, bindVars map[string]*querypb.BindVariable, wantields bool, callback func(*sqltypes.Result) error) error { + // TODO: I don't really need to do a merge sort here, is a single stream, but I'm lazy and don't want to think. + _, ok := vcursor.(*resultReader) + if !ok { + return fmt.Errorf("internal error: vcursor is not a resultReader: %T", vcursor) + } + rss := make([]*srvtopo.ResolvedShard, 0, 1) + bvs := make([]map[string]*querypb.BindVariable, 0, 1) + rss = append(rss, &srvtopo.ResolvedShard{ + Target: &querypb.Target{ + Shard: "-", + }, + }) + bvs = append(bvs, bindVars) + return engine.MergeSort(vcursor, "", ms.orderBy, rss, bvs, callback) +} + +//----------------------------------------------------------------- +// resultReader + +// resultReader acts as a VCursor for the wrapping primitives. +type resultReader struct { + engine.VCursor + ctx context.Context + participant *dfParams +} + +func newResultReader(ctx context.Context, participant *dfParams) *resultReader { + return &resultReader{ + ctx: ctx, + participant: participant, + } +} + +func (rr *resultReader) Context() context.Context { + return rr.ctx +} + +func (rr *resultReader) StreamExecuteMulti(query string, rss []*srvtopo.ResolvedShard, bindVars []map[string]*querypb.BindVariable, callback func(reply *sqltypes.Result) error) error { + for result := range rr.participant.result { + if err := callback(result); err != nil { + return err + } + } + return nil +} + +//----------------------------------------------------------------- +// tableDiffer + +func (td *tableDiffer) diff(ctx context.Context, sourceReader, targetReader *resultReader) (*DiffReport, error) { + sourceExecutor := newPrimitiveExecutor(ctx, sourceReader, td.sourcePrimitive) + targetExecutor := newPrimitiveExecutor(ctx, targetReader, td.targetPrimitive) + dr := &DiffReport{} + var sourceRow, targetRow []sqltypes.Value + var err error + advanceSource := true + advanceTarget := true + for { + if advanceSource { + sourceRow, err = sourceExecutor.next() + if err != nil { + return nil, err + } + } + if advanceTarget { + targetRow, err = targetExecutor.next() + if err != nil { + return nil, err + } + } + + if sourceRow == nil && targetRow == nil { + return dr, nil + } + + advanceSource = true + advanceTarget = true + + if sourceRow == nil { + // drain target, update count + pk := td.pkString(targetRow) + log.Errorf("Draining extra row(s) found on the target. This is the extra row pk: %v", pk) + count, err := targetExecutor.drain(ctx) + if err != nil { + return nil, err + } + dr.ExtraRowsTarget += 1 + count + dr.ProcessedRows += 1 + count + return dr, nil + } + if targetRow == nil { + // no more rows from the target + // we know we have rows from source, drain, update count + pk := td.pkString(targetRow) + log.Errorf("Draining extra row(s) found on the source. This is the extra row pk: %v", pk) + count, err := sourceExecutor.drain(ctx) + if err != nil { + return nil, err + } + dr.ExtraRowsSource += 1 + count + dr.ProcessedRows += 1 + count + return dr, nil + } + + dr.ProcessedRows++ + + // Compare pk values. + c, _, err := td.compare(sourceRow, targetRow, td.comparePKs) + switch { + case err != nil: + return nil, err + case c < 0: + if dr.ExtraRowsSource < 10 { + log.Errorf("[table=%v] Extra row %v on source: %v", td.targetTable, dr.ExtraRowsSource, td.pkString(sourceRow)) + } + dr.ExtraRowsSource++ + advanceTarget = false + continue + case c > 0: + if dr.ExtraRowsTarget < 10 { + log.Errorf("[table=%v] Extra row %v on target: %v", td.targetTable, dr.ExtraRowsTarget, td.pkString(targetRow)) + } + dr.ExtraRowsTarget++ + advanceSource = false + continue + } + + // c == 0 + // Compare non-pk values. + c, _, err = td.compare(sourceRow, targetRow, td.compareCols) + switch { + case err != nil: + return nil, err + case c != 0: + if dr.MismatchedRows < 10 { + log.Errorf("[table=%v] Different content for PK: %v", td.targetTable, td.pkString(sourceRow)) + } + dr.MismatchedRows++ + default: + dr.MatchingRows++ + } + } +} + +func (td *tableDiffer) pkString(targetRow []sqltypes.Value) string { + pk := "" + for index, col := range td.comparePKs { + if col == -1 { + continue + } + pk += fmt.Sprintf("%v:%v", td.comparePKNames[index], targetRow[col].String()) + if index < len(td.comparePKs)-1 { + pk += ", " + } + } + return pk +} + +func (td *tableDiffer) compare(sourceRow, targetRow []sqltypes.Value, cols []int) (int, int, error) { + for _, col := range cols { + if col == -1 { + continue + } + c, err := sqltypes.NullsafeCompare(sourceRow[col], targetRow[col]) + if err != nil { + return 0, 0, err + } + if c != 0 { + return c, col, nil + } + } + return 0, 0, nil +} + +func removeKeyrange(where *sqlparser.Where) *sqlparser.Where { + if where == nil { + return nil + } + if isFuncKeyrange(where.Expr) { + return nil + } + where.Expr = removeExprKeyrange(where.Expr) + return where +} + +func removeExprKeyrange(node sqlparser.Expr) sqlparser.Expr { + switch node := node.(type) { + case *sqlparser.AndExpr: + if isFuncKeyrange(node.Left) { + return removeExprKeyrange(node.Right) + } + if isFuncKeyrange(node.Right) { + return removeExprKeyrange(node.Left) + } + return &sqlparser.AndExpr{ + Left: removeExprKeyrange(node.Left), + Right: removeExprKeyrange(node.Right), + } + case *sqlparser.ParenExpr: + return &sqlparser.ParenExpr{ + Expr: removeExprKeyrange(node.Expr), + } + } + return node +} + +func isFuncKeyrange(expr sqlparser.Expr) bool { + funcExpr, ok := expr.(*sqlparser.FuncExpr) + return ok && funcExpr.Name.EqualString("in_keyrange") +} + +func wrapWeightString(expr sqlparser.SelectExpr) *sqlparser.AliasedExpr { + return &sqlparser.AliasedExpr{ + Expr: &sqlparser.FuncExpr{ + Name: sqlparser.NewColIdent("weight_string"), + Exprs: []sqlparser.SelectExpr{ + &sqlparser.AliasedExpr{ + Expr: expr.(*sqlparser.AliasedExpr).Expr, + }, + }, + }, + } +} + +func (df *vdiff) stopTarget(ctx context.Context) error { + var mu sync.Mutex + + query := fmt.Sprintf("update _vt.vreplication set state='Stopped', message='for vdiff' where db_name=%s and workflow=%s", encodeString(df.targetDf.master.DbName()), encodeString(df.workflow)) + _, err := df.tmc.VReplicationExec(ctx, df.targetDf.master.Tablet, query) + if err != nil { + return err + } + query = fmt.Sprintf("select source, pos from _vt.vreplication where db_name=%s and workflow=%s", encodeString(df.targetDf.master.DbName()), encodeString(df.workflow)) + p3qr, err := df.tmc.VReplicationExec(ctx, df.targetDf.master.Tablet, query) + if err != nil { + return err + } + qr := sqltypes.Proto3ToResult(p3qr) + + for _, row := range qr.Rows { + var bls binlogdatapb.BinlogSource + if err := proto.UnmarshalText(row[0].ToString(), &bls); err != nil { + return err + } + pos, err := mysql.DecodePosition(row[1].ToString()) + if err != nil { + return err + } + func() { + mu.Lock() + defer mu.Unlock() + + // if bls.Shard != df.shard { + // // Unreachable. + // return + // } + if !df.sourceDf.position.IsZero() && df.sourceDf.position.AtLeast(pos) { + return + } + df.sourceDf.position = pos + }() + } + return nil +} diff --git a/go/vt/vttablet/tabletmanager/vreplication/vstreamer_client.go b/go/vt/vttablet/tabletmanager/vreplication/vstreamer_client.go index 3f303238208..1c5a383970c 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vstreamer_client.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vstreamer_client.go @@ -27,6 +27,7 @@ import ( "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/dbconfigs" "vitess.io/vitess/go/vt/grpcclient" + "vitess.io/vitess/go/vt/mysqlctl" "vitess.io/vitess/go/vt/vtgate/vindexes" "vitess.io/vitess/go/vt/vttablet/queryservice" "vitess.io/vitess/go/vt/vttablet/tabletconn" @@ -58,6 +59,12 @@ type VStreamerClient interface { // VStreamRows streams rows of a table from the specified starting point. VStreamRows(ctx context.Context, query string, lastpk *querypb.QueryResult, send func(*binlogdatapb.VStreamRowsResponse) error) error + + // VStreamResults streams results along with the gtid of the snapshot. + VStreamResults(ctx context.Context, query string, send func(*binlogdatapb.VStreamResultsResponse) error) error + + // WaitForPosition ... + WaitForPosition(ctx context.Context, pos string) error } // TabletVStreamerClient a vstream client backed by vttablet @@ -68,6 +75,7 @@ type TabletVStreamerClient struct { isOpen bool tablet *topodatapb.Tablet + mysqld mysqlctl.MysqlDaemon target *querypb.Target tsQueryService queryservice.QueryService } @@ -79,14 +87,15 @@ type MySQLVStreamerClient struct { isOpen bool - sourceConnParams *mysql.ConnParams - sourceSe *schema.Engine + sourceCp *mysql.ConnParams + sourceSe *schema.Engine } // NewTabletVStreamerClient creates a new TabletVStreamerClient -func NewTabletVStreamerClient(tablet *topodatapb.Tablet) *TabletVStreamerClient { +func NewTabletVStreamerClient(tablet *topodatapb.Tablet, mysqld mysqlctl.MysqlDaemon) *TabletVStreamerClient { return &TabletVStreamerClient{ tablet: tablet, + mysqld: mysqld, target: &querypb.Target{ Keyspace: tablet.Keyspace, Shard: tablet.Shard, @@ -135,6 +144,24 @@ func (vsClient *TabletVStreamerClient) VStreamRows(ctx context.Context, query st return vsClient.tsQueryService.VStreamRows(ctx, vsClient.target, query, lastpk, send) } +// VStreamResults part of the VStreamerClient interface +func (vsClient *TabletVStreamerClient) VStreamResults(ctx context.Context, query string, send func(*binlogdatapb.VStreamResultsResponse) error) error { + if !vsClient.isOpen { + return errors.New("Can't VStreamRows without opening client") + } + vsClient.target.TabletType = topodatapb.TabletType_MASTER + return vsClient.tsQueryService.VStreamResults(ctx, vsClient.target, query, send) +} + +// WaitForPosition ... +func (vsClient *TabletVStreamerClient) WaitForPosition(ctx context.Context, pos string) error { + targetPos, err := mysql.DecodePosition(pos) + if err != nil { + return err + } + return vsClient.mysqld.WaitMasterPos(ctx, targetPos) +} + // NewMySQLVStreamerClient is a vstream client that allows you to stream directly from MySQL. // In order to achieve this, the following creates a vstreamer Engine with a dummy in memorytopo. func NewMySQLVStreamerClient() *MySQLVStreamerClient { @@ -144,7 +171,7 @@ func NewMySQLVStreamerClient() *MySQLVStreamerClient { // TODO: For now external mysql streams can only be used with ExternalReplWithDB creds. // In the future we will support multiple users. vsClient := &MySQLVStreamerClient{ - sourceConnParams: dbcfgs.ExternalReplWithDB(), + sourceCp: dbcfgs.ExternalReplWithDB(), } return vsClient } @@ -161,7 +188,7 @@ func (vsClient *MySQLVStreamerClient) Open(ctx context.Context) (err error) { // Let's create all the required components by vstreamer vsClient.sourceSe = schema.NewEngine(checker{}, tabletenv.DefaultQsConfig) - vsClient.sourceSe.InitDBConfig(vsClient.sourceConnParams) + vsClient.sourceSe.InitDBConfig(vsClient.sourceCp) err = vsClient.sourceSe.Open() if err != nil { return err @@ -187,7 +214,7 @@ func (vsClient *MySQLVStreamerClient) VStream(ctx context.Context, startPos stri if !vsClient.isOpen { return errors.New("can't VStream without opening client") } - streamer := vstreamer.NewVStreamer(ctx, vsClient.sourceConnParams, vsClient.sourceSe, startPos, filter, &vindexes.KeyspaceSchema{}, send) + streamer := vstreamer.NewVStreamer(ctx, vsClient.sourceCp, vsClient.sourceSe, startPos, filter, &vindexes.KeyspaceSchema{}, send) return streamer.Stream() } @@ -204,10 +231,119 @@ func (vsClient *MySQLVStreamerClient) VStreamRows(ctx context.Context, query str } row = r.Rows[0] } - streamer := vstreamer.NewRowStreamer(ctx, vsClient.sourceConnParams, vsClient.sourceSe, query, row, &vindexes.KeyspaceSchema{}, send) + + streamer := vstreamer.NewRowStreamer(ctx, vsClient.sourceCp, vsClient.sourceSe, query, row, &vindexes.KeyspaceSchema{}, send) return streamer.Stream() } +// VStreamResults part of the VStreamerClient interface +func (vsClient *MySQLVStreamerClient) VStreamResults(ctx context.Context, query string, send func(*binlogdatapb.VStreamResultsResponse) error) error { + if !vsClient.isOpen { + return errors.New("Can't VStreamRows without opening client") + } + + streamer := vstreamer.NewResultStreamer(ctx, vsClient.sourceCp, query, send) + return streamer.Stream() +} + +// WaitForPosition returns the master position +func (vsClient *MySQLVStreamerClient) WaitForPosition(ctx context.Context, pos string) error { + targetPos, err := mysql.DecodePosition(pos) + if err != nil { + return err + } + + // Get a connection. + params, err := dbconfigs.WithCredentials(vsClient.sourceCp) + if err != nil { + return err + } + conn, err := mysql.Connect(ctx, params) + if err != nil { + return fmt.Errorf("error in connecting to mysql db, err %v", err) + } + + defer conn.Close() + + // If we are the master, WaitUntilPositionCommand will fail. + // But position is most likely reached. So, check the position + // first. + mpos, err := conn.MasterPosition() + if err != nil { + return fmt.Errorf("WaitMasterPos: MasterPosition failed: %v", err) + } + if mpos.AtLeast(targetPos) { + return nil + } + + // Find the query to run, run it. + query, err := conn.WaitUntilPositionCommand(ctx, targetPos) + if err != nil { + return err + } + qr, err := executeFetchContext(ctx, conn, query, 1, true) + if err != nil { + return fmt.Errorf("WaitUntilPositionCommand(%v) failed: %v", query, err) + } + if len(qr.Rows) != 1 || len(qr.Rows[0]) != 1 { + return fmt.Errorf("unexpected result format from WaitUntilPositionCommand(%v): %#v", query, qr) + } + result := qr.Rows[0][0] + if result.IsNull() { + return fmt.Errorf("WaitUntilPositionCommand(%v) failed: replication is probably stopped", query) + } + if result.ToString() == "-1" { + return fmt.Errorf("timed out waiting for position %v", targetPos) + } + return nil +} + +func executeFetchContext(ctx context.Context, conn *mysql.Conn, query string, maxrows int, wantfields bool) (*sqltypes.Result, error) { + // Fast fail if context is done. + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + + // Execute asynchronously so we can select on both it and the context. + var qr *sqltypes.Result + var executeErr error + done := make(chan struct{}) + go func() { + defer close(done) + + qr, executeErr = conn.ExecuteFetch(query, maxrows, wantfields) + }() + + // Wait for either the query or the context to be done. + select { + case <-done: + return qr, executeErr + case <-ctx.Done(): + // If both are done already, we may end up here anyway because select + // chooses among multiple ready channels pseudorandomly. + // Check the done channel and prefer that one if it's ready. + select { + case <-done: + return qr, executeErr + default: + } + + // Wait for the conn.ExecuteFetch() call to return. + <-done + // Close the connection. Upon Recycle() it will be thrown out. + conn.Close() + // ExecuteFetch() may have succeeded before we tried to kill it. + // If ExecuteFetch() had returned because we cancelled it, + // then executeErr would be an error like "MySQL has gone away". + if executeErr == nil { + return qr, executeErr + } + return nil, ctx.Err() + } +} + // InitVStreamerClient initializes config for vstreamer client func InitVStreamerClient(cfg *dbconfigs.DBConfigs) { dbcfgs = cfg diff --git a/go/vt/vttablet/tabletmanager/vreplication/vstreamer_client_test.go b/go/vt/vttablet/tabletmanager/vreplication/vstreamer_client_test.go index e63bad6e668..18d2e45ed7d 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vstreamer_client_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vstreamer_client_test.go @@ -279,7 +279,7 @@ func TestNewMySQLVStreamerClient(t *testing.T) { { name: "sets conn params for MySQLVStreamerClient ", want: &MySQLVStreamerClient{ - sourceConnParams: env.Dbcfgs.ExternalReplWithDB(), + sourceCp: env.Dbcfgs.ExternalReplWithDB(), }, }, } @@ -332,7 +332,7 @@ func TestMySQLVStreamerClientOpen(t *testing.T) { for _, tcase := range tests { t.Run(tcase.name, func(t *testing.T) { vsClient := &MySQLVStreamerClient{ - sourceConnParams: tcase.fields.sourceConnParams, + sourceCp: tcase.fields.sourceConnParams, } err := vsClient.Open(tcase.args.ctx) @@ -390,8 +390,8 @@ func TestMySQLVStreamerClientClose(t *testing.T) { for _, tcase := range tests { t.Run(tcase.name, func(t *testing.T) { vsClient := &MySQLVStreamerClient{ - isOpen: tcase.fields.isOpen, - sourceConnParams: tcase.fields.sourceConnParams, + isOpen: tcase.fields.isOpen, + sourceCp: tcase.fields.sourceConnParams, } err := vsClient.Open(tcase.args.ctx) @@ -419,7 +419,7 @@ func TestMySQLVStreamerClientClose(t *testing.T) { func TestMySQLVStreamerClientVStream(t *testing.T) { vsClient := &MySQLVStreamerClient{ - sourceConnParams: env.Dbcfgs.ExternalReplWithDB(), + sourceCp: env.Dbcfgs.ExternalReplWithDB(), } filter := &binlogdatapb.Filter{ @@ -478,7 +478,7 @@ func TestMySQLVStreamerClientVStream(t *testing.T) { func TestMySQLVStreamerClientVStreamRows(t *testing.T) { vsClient := &MySQLVStreamerClient{ - sourceConnParams: env.Dbcfgs.ExternalReplWithDB(), + sourceCp: env.Dbcfgs.ExternalReplWithDB(), } eventsChan := make(chan *querypb.Row, 1000) diff --git a/go/vt/vttablet/tabletserver/api.go b/go/vt/vttablet/tabletserver/api.go new file mode 100644 index 00000000000..e4f975ee9e9 --- /dev/null +++ b/go/vt/vttablet/tabletserver/api.go @@ -0,0 +1,131 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tabletserver + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "strings" + + "vitess.io/vitess/go/acl" + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/vtctl" + "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" +) + +// This file implements a REST-style API for the vtctld web interface. + +const ( + apiPrefix = "/api/" + + jsonContentType = "application/json; charset=utf-8" +) + +func httpErrorf(w http.ResponseWriter, r *http.Request, format string, args ...interface{}) { + errMsg := fmt.Sprintf(format, args...) + log.Errorf("HTTP error on %v: %v, request: %#v", r.URL.Path, errMsg, r) + http.Error(w, errMsg, http.StatusInternalServerError) +} + +func handleAPI(apiPath string, handlerFunc func(w http.ResponseWriter, r *http.Request) error) { + http.HandleFunc(apiPrefix+apiPath, func(w http.ResponseWriter, r *http.Request) { + defer func() { + if x := recover(); x != nil { + httpErrorf(w, r, "uncaught panic: %v", x) + } + }() + if err := handlerFunc(w, r); err != nil { + httpErrorf(w, r, "%v", err) + } + }) +} + +func handleCollection(collection string, getFunc func(*http.Request) (interface{}, error)) { + handleAPI(collection+"/", func(w http.ResponseWriter, r *http.Request) error { + // Get the requested object. + obj, err := getFunc(r) + if err != nil { + if topo.IsErrType(err, topo.NoNode) { + http.NotFound(w, r) + return nil + } + return fmt.Errorf("can't get %v: %v", collection, err) + } + + // JSON encode response. + data, err := vtctl.MarshalJSON(obj) + if err != nil { + return fmt.Errorf("cannot marshal data: %v", err) + } + w.Header().Set("Content-Type", jsonContentType) + w.Write(data) + return nil + }) +} + +func getItemPath(url string) string { + // Strip API prefix. + if !strings.HasPrefix(url, apiPrefix) { + return "" + } + url = url[len(apiPrefix):] + + // Strip collection name. + parts := strings.SplitN(url, "/", 2) + if len(parts) != 2 { + return "" + } + return parts[1] +} + +func unmarshalRequest(r *http.Request, v interface{}) error { + data, err := ioutil.ReadAll(r.Body) + if err != nil { + return err + } + return json.Unmarshal(data, v) +} + +func InitAPI(vrEngine *vreplication.Engine) { + handleAPI("vdiff/start", func(w http.ResponseWriter, r *http.Request) error { + vrEngine.RunVdiff() + return nil + }) + + handleAPI("vdiff/abort", func(w http.ResponseWriter, r *http.Request) error { + vrEngine.AbortVdiff() + return nil + }) + + // Features + handleAPI("vdiff/status", func(w http.ResponseWriter, r *http.Request) error { + if err := acl.CheckAccessHTTP(r, acl.ADMIN); err != nil { + http.Error(w, "403 Forbidden", http.StatusForbidden) + return nil + } + resp, err := vrEngine.VDiffReportStatus() + if err != nil { + return fmt.Errorf("json error: %v", err) + } + w.Header().Set("Content-Type", jsonContentType) + w.Write(resp) + return nil + }) +} diff --git a/go/vt/vttablet/tabletserver/vstreamer/engine.go b/go/vt/vttablet/tabletserver/vstreamer/engine.go index 19b54c572d6..7eb02a5cb3b 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/engine.go +++ b/go/vt/vttablet/tabletserver/vstreamer/engine.go @@ -240,7 +240,7 @@ func (vse *Engine) StreamResults(ctx context.Context, query string, send func(*b if !vse.isOpen { return nil, 0, errors.New("VStreamer is not open") } - resultStreamer := newResultStreamer(ctx, vse.cp, query, send) + resultStreamer := NewResultStreamer(ctx, vse.cp, query, send) idx := vse.streamIdx vse.resultStreamers[idx] = resultStreamer vse.streamIdx++ diff --git a/go/vt/vttablet/tabletserver/vstreamer/resultstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/resultstreamer.go index e79b1076434..e515834340f 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/resultstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/resultstreamer.go @@ -38,7 +38,8 @@ type resultStreamer struct { send func(*binlogdatapb.VStreamResultsResponse) error } -func newResultStreamer(ctx context.Context, cp *mysql.ConnParams, query string, send func(*binlogdatapb.VStreamResultsResponse) error) *resultStreamer { +// NewResultStreamer creates a new result streamer +func NewResultStreamer(ctx context.Context, cp *mysql.ConnParams, query string, send func(*binlogdatapb.VStreamResultsResponse) error) *resultStreamer { ctx, cancel := context.WithCancel(ctx) return &resultStreamer{ ctx: ctx, @@ -89,12 +90,14 @@ func (rs *resultStreamer) Stream() error { for { select { case <-rs.ctx.Done(): + log.Errorf("Stream ended due to: %v", rs.ctx.Err()) return fmt.Errorf("stream ended: %v", rs.ctx.Err()) default: } row, err := conn.FetchNext() if err != nil { + log.Errorf("1 - Stream ended due to: %v", err) return err } if row == nil { @@ -108,6 +111,7 @@ func (rs *resultStreamer) Stream() error { if byteCount >= *PacketSize { err = rs.send(response) if err != nil { + log.Errorf("2 - Stream ended due to: %v", err) return err } // empty the rows so we start over, but we keep the @@ -120,6 +124,7 @@ func (rs *resultStreamer) Stream() error { if len(response.Rows) > 0 { err = rs.send(response) if err != nil { + log.Errorf("3 - Stream ended due to: %v", err) return err } } @@ -143,7 +148,6 @@ func (rs *resultStreamer) startStreaming(conn *mysql.Conn) (string, error) { lockConn.Close() }() - log.Infof("Locking table %s for copying", rs.tableName) if _, err := lockConn.ExecuteFetch(fmt.Sprintf("lock tables %s read", sqlparser.String(rs.tableName)), 0, false); err != nil { return "", err } diff --git a/go/vt/wrangler/migrater_env_test.go b/go/vt/wrangler/migrater_env_test.go index 81cdc7a6a1e..88cba82655a 100644 --- a/go/vt/wrangler/migrater_env_test.go +++ b/go/vt/wrangler/migrater_env_test.go @@ -314,7 +314,7 @@ func (tme *testMigraterEnv) createDBClients(ctx context.Context, t *testing.T) { dbclient := newFakeDBClient() tme.dbSourceClients = append(tme.dbSourceClients, dbclient) dbClientFactory := func() binlogplayer.DBClient { return dbclient } - master.Agent.VREngine = vreplication.NewEngine(tme.ts, "", master.FakeMysqlDaemon, dbClientFactory, dbclient.DBName()) + master.Agent.VREngine = vreplication.NewEngine(tme.ts, "", nil, master.FakeMysqlDaemon, dbClientFactory, dbclient.DBName()) if err := master.Agent.VREngine.Open(ctx); err != nil { t.Fatal(err) } @@ -323,7 +323,7 @@ func (tme *testMigraterEnv) createDBClients(ctx context.Context, t *testing.T) { dbclient := newFakeDBClient() tme.dbTargetClients = append(tme.dbTargetClients, dbclient) dbClientFactory := func() binlogplayer.DBClient { return dbclient } - master.Agent.VREngine = vreplication.NewEngine(tme.ts, "", master.FakeMysqlDaemon, dbClientFactory, dbclient.DBName()) + master.Agent.VREngine = vreplication.NewEngine(tme.ts, "", nil, master.FakeMysqlDaemon, dbClientFactory, dbclient.DBName()) if err := master.Agent.VREngine.Open(ctx); err != nil { t.Fatal(err) } diff --git a/go/vt/wrangler/testlib/migrate_served_from_test.go b/go/vt/wrangler/testlib/migrate_served_from_test.go index 22912050631..ff3a38cb9a1 100644 --- a/go/vt/wrangler/testlib/migrate_served_from_test.go +++ b/go/vt/wrangler/testlib/migrate_served_from_test.go @@ -106,7 +106,7 @@ func TestMigrateServedFrom(t *testing.T) { // Override with a fake VREngine after Agent is initialized in action loop. dbClient := binlogplayer.NewMockDBClient(t) dbClientFactory := func() binlogplayer.DBClient { return dbClient } - destMaster.Agent.VREngine = vreplication.NewEngine(ts, "", destMaster.FakeMysqlDaemon, dbClientFactory, dbClient.DBName()) + destMaster.Agent.VREngine = vreplication.NewEngine(ts, "", nil, destMaster.FakeMysqlDaemon, dbClientFactory, dbClient.DBName()) dbClient.ExpectRequest("select * from _vt.vreplication where db_name='db'", &sqltypes.Result{}, nil) if err := destMaster.Agent.VREngine.Open(context.Background()); err != nil { t.Fatal(err) diff --git a/go/vt/wrangler/testlib/migrate_served_types_test.go b/go/vt/wrangler/testlib/migrate_served_types_test.go index c7debf1cc6d..790272ec2a0 100644 --- a/go/vt/wrangler/testlib/migrate_served_types_test.go +++ b/go/vt/wrangler/testlib/migrate_served_types_test.go @@ -153,7 +153,7 @@ func TestMigrateServedTypes(t *testing.T) { // Override with a fake VREngine after Agent is initialized in action loop. dbClient1 := binlogplayer.NewMockDBClient(t) dbClientFactory1 := func() binlogplayer.DBClient { return dbClient1 } - dest1Master.Agent.VREngine = vreplication.NewEngine(ts, "", dest1Master.FakeMysqlDaemon, dbClientFactory1, dbClient1.DBName()) + dest1Master.Agent.VREngine = vreplication.NewEngine(ts, "", nil, dest1Master.FakeMysqlDaemon, dbClientFactory1, dbClient1.DBName()) // select * from _vt.vreplication during Open dbClient1.ExpectRequest("select * from _vt.vreplication where db_name='db'", &sqltypes.Result{}, nil) if err := dest1Master.Agent.VREngine.Open(context.Background()); err != nil { @@ -181,7 +181,7 @@ func TestMigrateServedTypes(t *testing.T) { // Override with a fake VREngine after Agent is initialized in action loop. dbClient2 := binlogplayer.NewMockDBClient(t) dbClientFactory2 := func() binlogplayer.DBClient { return dbClient2 } - dest2Master.Agent.VREngine = vreplication.NewEngine(ts, "", dest2Master.FakeMysqlDaemon, dbClientFactory2, dbClient2.DBName()) + dest2Master.Agent.VREngine = vreplication.NewEngine(ts, "", nil, dest2Master.FakeMysqlDaemon, dbClientFactory2, dbClient2.DBName()) // select * from _vt.vreplication during Open dbClient2.ExpectRequest("select * from _vt.vreplication where db_name='db'", &sqltypes.Result{}, nil) if err := dest2Master.Agent.VREngine.Open(context.Background()); err != nil { @@ -417,7 +417,7 @@ func TestMultiShardMigrateServedTypes(t *testing.T) { // Override with a fake VREngine after Agent is initialized in action loop. dbClient1 := binlogplayer.NewMockDBClient(t) dbClientFactory1 := func() binlogplayer.DBClient { return dbClient1 } - dest1Master.Agent.VREngine = vreplication.NewEngine(ts, "", dest1Master.FakeMysqlDaemon, dbClientFactory1, "db") + dest1Master.Agent.VREngine = vreplication.NewEngine(ts, "", nil, dest1Master.FakeMysqlDaemon, dbClientFactory1, "db") // select * from _vt.vreplication during Open dbClient1.ExpectRequest("select * from _vt.vreplication where db_name='db'", &sqltypes.Result{}, nil) if err := dest1Master.Agent.VREngine.Open(context.Background()); err != nil { @@ -434,7 +434,7 @@ func TestMultiShardMigrateServedTypes(t *testing.T) { // Override with a fake VREngine after Agent is initialized in action loop. dbClient2 := binlogplayer.NewMockDBClient(t) dbClientFactory2 := func() binlogplayer.DBClient { return dbClient2 } - dest2Master.Agent.VREngine = vreplication.NewEngine(ts, "", dest2Master.FakeMysqlDaemon, dbClientFactory2, "db") + dest2Master.Agent.VREngine = vreplication.NewEngine(ts, "", nil, dest2Master.FakeMysqlDaemon, dbClientFactory2, "db") // select * from _vt.vreplication during Open dbClient2.ExpectRequest("select * from _vt.vreplication where db_name='db'", &sqltypes.Result{}, nil) if err := dest2Master.Agent.VREngine.Open(context.Background()); err != nil { @@ -505,7 +505,7 @@ func TestMultiShardMigrateServedTypes(t *testing.T) { // Override with a fake VREngine after Agent is initialized in action loop. dbClient1 = binlogplayer.NewMockDBClient(t) dbClientFactory1 = func() binlogplayer.DBClient { return dbClient1 } - dest3Master.Agent.VREngine = vreplication.NewEngine(ts, "", dest3Master.FakeMysqlDaemon, dbClientFactory1, "db") + dest3Master.Agent.VREngine = vreplication.NewEngine(ts, "", nil, dest3Master.FakeMysqlDaemon, dbClientFactory1, "db") // select * from _vt.vreplication during Open dbClient1.ExpectRequest("select * from _vt.vreplication where db_name='db'", &sqltypes.Result{}, nil) if err := dest3Master.Agent.VREngine.Open(context.Background()); err != nil { @@ -522,7 +522,7 @@ func TestMultiShardMigrateServedTypes(t *testing.T) { // Override with a fake VREngine after Agent is initialized in action loop. dbClient2 = binlogplayer.NewMockDBClient(t) dbClientFactory2 = func() binlogplayer.DBClient { return dbClient2 } - dest4Master.Agent.VREngine = vreplication.NewEngine(ts, "", dest4Master.FakeMysqlDaemon, dbClientFactory2, "db") + dest4Master.Agent.VREngine = vreplication.NewEngine(ts, "", nil, dest4Master.FakeMysqlDaemon, dbClientFactory2, "db") // select * from _vt.vreplication during Open dbClient2.ExpectRequest("select * from _vt.vreplication where db_name='db'", &sqltypes.Result{}, nil) if err := dest4Master.Agent.VREngine.Open(context.Background()); err != nil { From 1c13207873aaffa719fe48d92506fee5163db73b Mon Sep 17 00:00:00 2001 From: Rafael Chacon Date: Thu, 9 Jan 2020 13:29:23 -0800 Subject: [PATCH 2/6] Removes extra log items Signed-off-by: Rafael Chacon --- go/vt/vttablet/tabletmanager/vreplication/controller.go | 1 - go/vt/vttablet/tabletmanager/vreplication/vdiff.go | 4 ++-- go/vt/vttablet/tabletserver/vstreamer/resultstreamer.go | 4 ---- 3 files changed, 2 insertions(+), 7 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller.go b/go/vt/vttablet/tabletmanager/vreplication/controller.go index c038d9f5819..55444307593 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller.go @@ -142,7 +142,6 @@ func (ct *controller) run(ctx context.Context) { if err == nil { return } - // Sometimes, canceled contexts get wrapped as errors. select { case <-ctx.Done(): diff --git a/go/vt/vttablet/tabletmanager/vreplication/vdiff.go b/go/vt/vttablet/tabletmanager/vreplication/vdiff.go index d0b5d0c2a54..69dc3db4c1b 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vdiff.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vdiff.go @@ -440,10 +440,10 @@ func (df *vdiff) syncTargets(ctx context.Context, filteredReplicationWaitTime ti return err } - log.Infof("VReplication stopped") if err := df.tmc.VReplicationWaitForPos(waitCtx, df.targetDf.master.Tablet, int(df.id), pos); err != nil { return vterrors.Wrapf(err, "VReplicationWaitForPos for tablet %v", topoproto.TabletAliasString(df.targetDf.master.Tablet.Alias)) } + log.Infof("VReplication successfully stopped at position: %v", pos) pos, err := df.tmc.MasterPosition(ctx, df.targetDf.master.Tablet) if err != nil { @@ -644,7 +644,7 @@ func (td *tableDiffer) diff(ctx context.Context, sourceReader, targetReader *res if targetRow == nil { // no more rows from the target // we know we have rows from source, drain, update count - pk := td.pkString(targetRow) + pk := td.pkString(sourceRow) log.Errorf("Draining extra row(s) found on the source. This is the extra row pk: %v", pk) count, err := sourceExecutor.drain(ctx) if err != nil { diff --git a/go/vt/vttablet/tabletserver/vstreamer/resultstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/resultstreamer.go index e515834340f..ef54e5ad53b 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/resultstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/resultstreamer.go @@ -90,14 +90,12 @@ func (rs *resultStreamer) Stream() error { for { select { case <-rs.ctx.Done(): - log.Errorf("Stream ended due to: %v", rs.ctx.Err()) return fmt.Errorf("stream ended: %v", rs.ctx.Err()) default: } row, err := conn.FetchNext() if err != nil { - log.Errorf("1 - Stream ended due to: %v", err) return err } if row == nil { @@ -111,7 +109,6 @@ func (rs *resultStreamer) Stream() error { if byteCount >= *PacketSize { err = rs.send(response) if err != nil { - log.Errorf("2 - Stream ended due to: %v", err) return err } // empty the rows so we start over, but we keep the @@ -124,7 +121,6 @@ func (rs *resultStreamer) Stream() error { if len(response.Rows) > 0 { err = rs.send(response) if err != nil { - log.Errorf("3 - Stream ended due to: %v", err) return err } } From fbed963bb377f022f44252edcf8fb269b04c79fd Mon Sep 17 00:00:00 2001 From: Rafael Chacon Date: Thu, 9 Jan 2020 13:52:43 -0800 Subject: [PATCH 3/6] Print actual position where it actually stop Signed-off-by: Rafael Chacon --- go/vt/vttablet/tabletmanager/vreplication/engine.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/vttablet/tabletmanager/vreplication/engine.go b/go/vt/vttablet/tabletmanager/vreplication/engine.go index 87bebd3c648..8213f460ccd 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/engine.go +++ b/go/vt/vttablet/tabletmanager/vreplication/engine.go @@ -653,7 +653,7 @@ func (vre *Engine) WaitForPos(ctx context.Context, id int, pos string) error { } if current.AtLeast(mPos) { - log.Infof("position: %s reached, wait time: %v", pos, time.Since(start)) + log.Infof("position: %s reached, wait time: %v", current, time.Since(start)) return nil } From ccc4b5d96a86ab2e8475e9298a2009ae6673cd09 Mon Sep 17 00:00:00 2001 From: Rafael Chacon Date: Fri, 10 Jan 2020 11:21:45 -0800 Subject: [PATCH 4/6] Fixes bug in filepos flavor * Prior to this commit, flavorpos was using lexicographical comparison of the gtids. Thas was a bug in this context. Signed-off-by: Rafael Chacon --- go/mysql/binlog_event_filepos.go | 3 +- go/mysql/filepos_gtid.go | 13 +++- go/mysql/filepos_gtid_test.go | 102 +++++++++++++++++++++++++++++++ go/mysql/flavor_filepos.go | 25 +++++--- 4 files changed, 128 insertions(+), 15 deletions(-) create mode 100644 go/mysql/filepos_gtid_test.go diff --git a/go/mysql/binlog_event_filepos.go b/go/mysql/binlog_event_filepos.go index 9b2b6e1cef8..dfec653081e 100644 --- a/go/mysql/binlog_event_filepos.go +++ b/go/mysql/binlog_event_filepos.go @@ -19,7 +19,6 @@ package mysql import ( "encoding/binary" "fmt" - "strconv" ) // filePosBinlogEvent wraps a raw packet buffer and provides methods to examine @@ -228,7 +227,7 @@ func newFilePosGTIDEvent(file string, pos int, timestamp uint32) filePosGTIDEven }, gtid: filePosGTID{ file: file, - pos: strconv.Itoa(pos), + pos: pos, }, } } diff --git a/go/mysql/filepos_gtid.go b/go/mysql/filepos_gtid.go index 9894e405494..d261fc52717 100644 --- a/go/mysql/filepos_gtid.go +++ b/go/mysql/filepos_gtid.go @@ -18,6 +18,7 @@ package mysql import ( "fmt" + "strconv" "strings" ) @@ -31,9 +32,14 @@ func parseFilePosGTID(s string) (GTID, error) { return nil, fmt.Errorf("invalid FilePos GTID (%v): expecting file:pos", s) } + pos, err := strconv.Atoi(parts[1]) + if err != nil { + return nil, fmt.Errorf("invalid FilePos GTID (%v): expecting pos to be an integer", s) + } + return filePosGTID{ file: parts[0], - pos: parts[1], + pos: pos, }, nil } @@ -48,12 +54,13 @@ func parseFilePosGTIDSet(s string) (GTIDSet, error) { // filePosGTID implements GTID. type filePosGTID struct { - file, pos string + file string + pos int } // String implements GTID.String(). func (gtid filePosGTID) String() string { - return gtid.file + ":" + gtid.pos + return fmt.Sprintf("%s:%d", gtid.file, gtid.pos) } // Flavor implements GTID.Flavor(). diff --git a/go/mysql/filepos_gtid_test.go b/go/mysql/filepos_gtid_test.go new file mode 100644 index 00000000000..b3c98b74669 --- /dev/null +++ b/go/mysql/filepos_gtid_test.go @@ -0,0 +1,102 @@ +/* +Copyright 2020 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mysql + +import ( + "testing" +) + +func Test_filePosGTID_String(t *testing.T) { + type fields struct { + file string + pos int + } + tests := []struct { + name string + fields fields + want string + }{ + { + "formats gtid correctly", + fields{file: "mysql-bin.166031", pos: 192394}, + "mysql-bin.166031:192394", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gtid := filePosGTID{ + file: tt.fields.file, + pos: tt.fields.pos, + } + if got := gtid.String(); got != tt.want { + t.Errorf("filePosGTID.String() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_filePosGTID_ContainsGTID(t *testing.T) { + type fields struct { + file string + pos int + } + type args struct { + other GTID + } + tests := []struct { + name string + fields fields + args args + want bool + }{ + { + "returns true when the position is equal", + fields{file: "testfile", pos: 1234}, + args{other: filePosGTID{file: "testfile", pos: 1234}}, + true, + }, + { + "returns true when the position is less than equal", + fields{file: "testfile", pos: 1234}, + args{other: filePosGTID{file: "testfile", pos: 1233}}, + true, + }, + { + "returns false when the position is less than equal", + fields{file: "testfile", pos: 1234}, + args{other: filePosGTID{file: "testfile", pos: 1235}}, + false, + }, + { + "it uses integer value for comparison (it is not lexicographical order)", + fields{file: "testfile", pos: 99761227}, + args{other: filePosGTID{file: "testfile", pos: 103939867}}, + false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gtid := filePosGTID{ + file: tt.fields.file, + pos: tt.fields.pos, + } + if got := gtid.ContainsGTID(tt.args.other); got != tt.want { + t.Errorf("filePosGTID.ContainsGTID() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/go/mysql/flavor_filepos.go b/go/mysql/flavor_filepos.go index 15d0fc572e8..fe5cc862c65 100644 --- a/go/mysql/flavor_filepos.go +++ b/go/mysql/flavor_filepos.go @@ -50,9 +50,14 @@ func (flv *filePosFlavor) masterGTIDSet(c *Conn) (GTIDSet, error) { if err != nil { return nil, err } + pos, err := strconv.Atoi(resultMap["Position"]) + if err != nil { + return nil, fmt.Errorf("invalid FilePos GTID (%v): expecting pos to be an integer", resultMap["Position"]) + } + return filePosGTID{ file: resultMap["File"], - pos: resultMap["Position"], + pos: pos, }, nil } @@ -71,13 +76,8 @@ func (flv *filePosFlavor) sendBinlogDumpCommand(c *Conn, slaveID uint32, startPo return fmt.Errorf("startPos.GTIDSet is wrong type - expected filePosGTID, got: %#v", startPos.GTIDSet) } - pos, err := strconv.Atoi(rpos.pos) - if err != nil { - return fmt.Errorf("invalid position: %v", startPos.GTIDSet) - } flv.file = rpos.file - - return c.WriteComBinlogDump(slaveID, rpos.file, uint32(pos), 0) + return c.WriteComBinlogDump(slaveID, rpos.file, uint32(rpos.pos), 0) } // readBinlogEvent is part of the Flavor interface. @@ -185,9 +185,14 @@ func (flv *filePosFlavor) status(c *Conn) (SlaveStatus, error) { } status := parseSlaveStatus(resultMap) + pos, err := strconv.Atoi(resultMap["Exec_Master_Log_Pos"]) + if err != nil { + return SlaveStatus{}, fmt.Errorf("invalid FilePos GTID (%v): expecting pos to be an integer", resultMap["Exec_Master_Log_Pos"]) + } + status.Position.GTIDSet = filePosGTID{ file: resultMap["Relay_Master_Log_File"], - pos: resultMap["Exec_Master_Log_Pos"], + pos: pos, } return status, nil } @@ -204,10 +209,10 @@ func (flv *filePosFlavor) waitUntilPositionCommand(ctx context.Context, pos Posi if timeout <= 0 { return "", fmt.Errorf("timed out waiting for position %v", pos) } - return fmt.Sprintf("SELECT MASTER_POS_WAIT('%s', %s, %.6f)", filePosPos.file, filePosPos.pos, timeout.Seconds()), nil + return fmt.Sprintf("SELECT MASTER_POS_WAIT('%s', %d, %.6f)", filePosPos.file, filePosPos.pos, timeout.Seconds()), nil } - return fmt.Sprintf("SELECT MASTER_POS_WAIT('%s', %s)", filePosPos.file, filePosPos.pos), nil + return fmt.Sprintf("SELECT MASTER_POS_WAIT('%s', %d)", filePosPos.file, filePosPos.pos), nil } func (*filePosFlavor) startSlaveUntilAfter(pos Position) string { From 779c483f8523e2c332dedac31672b9fd326d7640 Mon Sep 17 00:00:00 2001 From: Rafael Chacon Date: Fri, 10 Jan 2020 16:38:59 -0800 Subject: [PATCH 5/6] Updates how master gtid position is obtained for file:pos flavor When generating masterGTIDSet in file:pos most likely you will have a topology like the following: Source A -> Target B (B has a vreplication stream from A) From the target perspective, the source A is the master and you want to generate a gtid that is based on binlog file position of that server. As an example, let's see this topology: Master A -> Source B -> Target C (C has vreplication stream from B) Prior to this change, masterGTIDSet was returning the binlogfile:pos of A. But in reality, the Target C wants the position of B. Signed-off-by: Rafael Chacon --- go/mysql/flavor_filepos.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/mysql/flavor_filepos.go b/go/mysql/flavor_filepos.go index fe5cc862c65..6f320c1b57b 100644 --- a/go/mysql/flavor_filepos.go +++ b/go/mysql/flavor_filepos.go @@ -44,7 +44,7 @@ func (flv *filePosFlavor) masterGTIDSet(c *Conn) (GTIDSet, error) { return nil, err } if len(qr.Rows) == 0 { - return nil, errors.New("no master or slave status") + return nil, errors.New("no master status") } resultMap, err := resultToMap(qr) if err != nil { From 58423f884bffedf550fecc09a37633236dfac195 Mon Sep 17 00:00:00 2001 From: Rafael Chacon Date: Tue, 14 Jan 2020 11:00:01 -0800 Subject: [PATCH 6/6] Cleanup per review * Address PR review + some other cleanup per linter Signed-off-by: Rafael Chacon --- .../tabletmanager/vreplication/controller.go | 2 - .../tabletmanager/vreplication/engine.go | 5 +- .../vreplication/vstreamer_client.go | 4 +- .../vreplication/vstreamer_client_test.go | 16 +----- go/vt/vttablet/tabletserver/api.go | 52 +------------------ 5 files changed, 8 insertions(+), 71 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller.go b/go/vt/vttablet/tabletmanager/vreplication/controller.go index 55444307593..4d0df4839b3 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller.go @@ -20,7 +20,6 @@ import ( "flag" "fmt" "strconv" - "sync" "time" "vitess.io/vitess/go/vt/discovery" @@ -45,7 +44,6 @@ var ( healthcheckRetryDelay = flag.Duration("vreplication_healthcheck_retry_delay", 5*time.Second, "healthcheck retry delay") healthcheckTimeout = flag.Duration("vreplication_healthcheck_timeout", 1*time.Minute, "healthcheck retry delay") retryDelay = flag.Duration("vreplication_retry_delay", 5*time.Second, "delay before retrying a failed binlog connection") - onlyOnceVdiff sync.Once ) // controller is created by Engine. Members are initialized upfront. diff --git a/go/vt/vttablet/tabletmanager/vreplication/engine.go b/go/vt/vttablet/tabletmanager/vreplication/engine.go index 8213f460ccd..c575fd94a5e 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/engine.go +++ b/go/vt/vttablet/tabletmanager/vreplication/engine.go @@ -223,6 +223,7 @@ func (vre *Engine) IsOpen() bool { return vre.isOpen } +// RunVdiff starts a vdiff run on this tablet func (vre *Engine) RunVdiff() { vre.vdiffMu.Lock() defer vre.vdiffMu.Unlock() @@ -249,9 +250,9 @@ func (vre *Engine) RunVdiff() { vre.vdiffError = diffError vre.isVDiffRunning = false }() - return } +// AbortVdiff aborts current vdiff run func (vre *Engine) AbortVdiff() { vre.vdiffMu.Lock() defer vre.vdiffMu.Unlock() @@ -260,9 +261,9 @@ func (vre *Engine) AbortVdiff() { } vre.isVDiffRunning = false vre.vdiffCancel() - return } +// VDiffReportStatus returns status for current VDiff run func (vre *Engine) VDiffReportStatus() ([]byte, error) { vre.vdiffMu.Lock() defer vre.vdiffMu.Unlock() diff --git a/go/vt/vttablet/tabletmanager/vreplication/vstreamer_client.go b/go/vt/vttablet/tabletmanager/vreplication/vstreamer_client.go index 1c5a383970c..8172112df1f 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vstreamer_client.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vstreamer_client.go @@ -147,7 +147,7 @@ func (vsClient *TabletVStreamerClient) VStreamRows(ctx context.Context, query st // VStreamResults part of the VStreamerClient interface func (vsClient *TabletVStreamerClient) VStreamResults(ctx context.Context, query string, send func(*binlogdatapb.VStreamResultsResponse) error) error { if !vsClient.isOpen { - return errors.New("Can't VStreamRows without opening client") + return errors.New("can't VStreamRows without opening client") } vsClient.target.TabletType = topodatapb.TabletType_MASTER return vsClient.tsQueryService.VStreamResults(ctx, vsClient.target, query, send) @@ -239,7 +239,7 @@ func (vsClient *MySQLVStreamerClient) VStreamRows(ctx context.Context, query str // VStreamResults part of the VStreamerClient interface func (vsClient *MySQLVStreamerClient) VStreamResults(ctx context.Context, query string, send func(*binlogdatapb.VStreamResultsResponse) error) error { if !vsClient.isOpen { - return errors.New("Can't VStreamRows without opening client") + return errors.New("can't VStreamRows without opening client") } streamer := vstreamer.NewResultStreamer(ctx, vsClient.sourceCp, query, send) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vstreamer_client_test.go b/go/vt/vttablet/tabletmanager/vreplication/vstreamer_client_test.go index 18d2e45ed7d..5823acaf8bc 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vstreamer_client_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vstreamer_client_test.go @@ -26,9 +26,6 @@ import ( "golang.org/x/net/context" "vitess.io/vitess/go/mysql" - "vitess.io/vitess/go/vt/vttablet/queryservice" - "vitess.io/vitess/go/vt/vttablet/tabletserver/schema" - "vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" querypb "vitess.io/vitess/go/vt/proto/query" @@ -40,10 +37,7 @@ func TestTabletVStreamerClientOpen(t *testing.T) { defer deleteTablet(tablet) type fields struct { - isOpen bool - tablet *topodatapb.Tablet - target *querypb.Target - tsQueryService queryservice.QueryService + tablet *topodatapb.Tablet } type args struct { ctx context.Context @@ -100,10 +94,7 @@ func TestTabletVStreamerClientClose(t *testing.T) { defer deleteTablet(tablet) type fields struct { - isOpen bool - tablet *topodatapb.Tablet - target *querypb.Target - tsQueryService queryservice.QueryService + tablet *topodatapb.Tablet } type args struct { ctx context.Context @@ -294,7 +285,6 @@ func TestNewMySQLVStreamerClient(t *testing.T) { func TestMySQLVStreamerClientOpen(t *testing.T) { type fields struct { - isOpen bool sourceConnParams *mysql.ConnParams } type args struct { @@ -363,8 +353,6 @@ func TestMySQLVStreamerClientClose(t *testing.T) { type fields struct { isOpen bool sourceConnParams *mysql.ConnParams - vsEngine *vstreamer.Engine - sourceSe *schema.Engine } type args struct { ctx context.Context diff --git a/go/vt/vttablet/tabletserver/api.go b/go/vt/vttablet/tabletserver/api.go index e4f975ee9e9..7615fda7415 100644 --- a/go/vt/vttablet/tabletserver/api.go +++ b/go/vt/vttablet/tabletserver/api.go @@ -17,16 +17,11 @@ limitations under the License. package tabletserver import ( - "encoding/json" "fmt" - "io/ioutil" "net/http" - "strings" "vitess.io/vitess/go/acl" "vitess.io/vitess/go/vt/log" - "vitess.io/vitess/go/vt/topo" - "vitess.io/vitess/go/vt/vtctl" "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" ) @@ -57,52 +52,7 @@ func handleAPI(apiPath string, handlerFunc func(w http.ResponseWriter, r *http.R }) } -func handleCollection(collection string, getFunc func(*http.Request) (interface{}, error)) { - handleAPI(collection+"/", func(w http.ResponseWriter, r *http.Request) error { - // Get the requested object. - obj, err := getFunc(r) - if err != nil { - if topo.IsErrType(err, topo.NoNode) { - http.NotFound(w, r) - return nil - } - return fmt.Errorf("can't get %v: %v", collection, err) - } - - // JSON encode response. - data, err := vtctl.MarshalJSON(obj) - if err != nil { - return fmt.Errorf("cannot marshal data: %v", err) - } - w.Header().Set("Content-Type", jsonContentType) - w.Write(data) - return nil - }) -} - -func getItemPath(url string) string { - // Strip API prefix. - if !strings.HasPrefix(url, apiPrefix) { - return "" - } - url = url[len(apiPrefix):] - - // Strip collection name. - parts := strings.SplitN(url, "/", 2) - if len(parts) != 2 { - return "" - } - return parts[1] -} - -func unmarshalRequest(r *http.Request, v interface{}) error { - data, err := ioutil.ReadAll(r.Body) - if err != nil { - return err - } - return json.Unmarshal(data, v) -} - +// InitAPI initializes api for tabletserver func InitAPI(vrEngine *vreplication.Engine) { handleAPI("vdiff/start", func(w http.ResponseWriter, r *http.Request) error { vrEngine.RunVdiff()