diff --git a/go/mysql/fakesqldb/server.go b/go/mysql/fakesqldb/server.go index cfd524334f3..71bf08f06c8 100644 --- a/go/mysql/fakesqldb/server.go +++ b/go/mysql/fakesqldb/server.go @@ -116,6 +116,9 @@ type DB struct { // connections tracks all open connections. // The key for the map is the value of mysql.Conn.ConnectionID. connections map[uint32]*mysql.Conn + + // queryPatternUserCallback stores optional callbacks when a query with a pattern is called + queryPatternUserCallback map[*regexp.Regexp]func(string) } // QueryHandler is the interface used by the DB to simulate executed queries @@ -157,13 +160,14 @@ func New(t *testing.T) *DB { // Create our DB. db := &DB{ - t: t, - socketFile: socketFile, - name: "fakesqldb", - data: make(map[string]*ExpectedResult), - rejectedData: make(map[string]error), - queryCalled: make(map[string]int), - connections: make(map[uint32]*mysql.Conn), + t: t, + socketFile: socketFile, + name: "fakesqldb", + data: make(map[string]*ExpectedResult), + rejectedData: make(map[string]error), + queryCalled: make(map[string]int), + connections: make(map[uint32]*mysql.Conn), + queryPatternUserCallback: make(map[*regexp.Regexp]func(string)), } db.Handler = db @@ -344,7 +348,6 @@ func (db *DB) HandleQuery(c *mysql.Conn, query string, callback func(*sqltypes.R defer db.mu.Unlock() db.queryCalled[key]++ db.querylog = append(db.querylog, key) - // Check if we should close the connection and provoke errno 2013. if db.shouldClose { c.Close() @@ -384,6 +387,10 @@ func (db *DB) HandleQuery(c *mysql.Conn, query string, callback func(*sqltypes.R // Check query patterns from AddQueryPattern(). for _, pat := range db.patternData { if pat.expr.MatchString(query) { + userCallback, ok := db.queryPatternUserCallback[pat.expr] + if ok { + userCallback(query) + } return callback(pat.result) } } @@ -500,6 +507,13 @@ func (db *DB) AddQueryPattern(queryPattern string, expectedResult *sqltypes.Resu db.patternData = append(db.patternData, exprResult{expr, &result}) } +// AddQueryPatternWithCallback is similar to AddQueryPattern: in addition it calls the provided callback function +// The callback can be used to set user counters/variables for testing specific usecases +func (db *DB) AddQueryPatternWithCallback(queryPattern string, expectedResult *sqltypes.Result, callback func(string)) { + db.AddQueryPattern(queryPattern, expectedResult) + db.queryPatternUserCallback[db.patternData[len(db.patternData)-1].expr] = callback +} + // DeleteQuery deletes query from the fake DB. func (db *DB) DeleteQuery(query string) { db.mu.Lock() diff --git a/go/test/endtoend/cluster/vtctlclient_process.go b/go/test/endtoend/cluster/vtctlclient_process.go index edaa2fe9a52..06e82af84a6 100644 --- a/go/test/endtoend/cluster/vtctlclient_process.go +++ b/go/test/endtoend/cluster/vtctlclient_process.go @@ -36,11 +36,15 @@ type VtctlClientProcess struct { // InitShardMaster executes vtctlclient command to make one of tablet as master func (vtctlclient *VtctlClientProcess) InitShardMaster(Keyspace string, Shard string, Cell string, TabletUID int) (err error) { - return vtctlclient.ExecuteCommand( + output, err := vtctlclient.ExecuteCommandWithOutput( "InitShardMaster", "-force", fmt.Sprintf("%s/%s", Keyspace, Shard), fmt.Sprintf("%s-%d", Cell, TabletUID)) + if err != nil { + log.Errorf("error in InitShardMaster output %s, err %s", output, err.Error()) + } + return err } // ApplySchema applies SQL schema to the keyspace @@ -73,7 +77,11 @@ func (vtctlclient *VtctlClientProcess) ExecuteCommand(args ...string) (err error pArgs..., ) log.Infof("Executing vtctlclient with command: %v", strings.Join(tmpProcess.Args, " ")) - return tmpProcess.Run() + output, err := tmpProcess.Output() + if err != nil { + log.Errorf("Error executing %s: output %s, err %v", strings.Join(tmpProcess.Args, " "), output, err) + } + return err } // ExecuteCommandWithOutput executes any vtctlclient command and returns output diff --git a/go/vt/vttablet/endtoend/vstreamer_test.go b/go/vt/vttablet/endtoend/vstreamer_test.go index 67e7aac05bf..b2c2d026150 100644 --- a/go/vt/vttablet/endtoend/vstreamer_test.go +++ b/go/vt/vttablet/endtoend/vstreamer_test.go @@ -320,94 +320,6 @@ func TestSchemaVersioning(t *testing.T) { log.Info("=== END OF TEST") } -func TestSchemaVersioningLongDDL(t *testing.T) { - // Let's disable the already running tracker to prevent it from - // picking events from the previous test, and then re-enable it at the end. - tsv := framework.Server - tsv.EnableHistorian(false) - tsv.SetTracking(false) - defer tsv.EnableHistorian(true) - defer tsv.SetTracking(true) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - tsv.EnableHistorian(true) - tsv.SetTracking(true) - - target := &querypb.Target{ - Keyspace: "vttest", - Shard: "0", - TabletType: tabletpb.TabletType_MASTER, - Cell: "", - } - filter := &binlogdatapb.Filter{ - Rules: []*binlogdatapb.Rule{{ - Match: "/.*/", - }}, - } - longDDL := "create table vitess_version (" - for i := 0; i < 100; i++ { - col := fmt.Sprintf("id%d_%s int", i, strings.Repeat("0", 10)) - if i != 99 { - col += ", " - } - longDDL += col - } - longDDL += ")" - - var cases = []test{ - { - query: longDDL, - output: append(append([]string{ - `gtid`, //gtid+other => vstream current pos - `other`, - `gtid`, //gtid+ddl => actual query - fmt.Sprintf(`type:DDL ddl:"%s" `, longDDL)}, - getSchemaVersionTableCreationEvents()...), - `version`, - `gtid`, - ), - }, - } - eventCh := make(chan []*binlogdatapb.VEvent) - var startPos string - send := func(events []*binlogdatapb.VEvent) error { - var evs []*binlogdatapb.VEvent - for _, event := range events { - if event.Type == binlogdatapb.VEventType_GTID { - if startPos == "" { - startPos = event.Gtid - } - } - if event.Type == binlogdatapb.VEventType_HEARTBEAT { - continue - } - log.Infof("Received event %v", event) - evs = append(evs, event) - } - select { - case eventCh <- evs: - case <-ctx.Done(): - return nil - } - return nil - } - go func() { - defer close(eventCh) - if err := tsv.VStream(ctx, target, "current", nil, filter, send); err != nil { - fmt.Printf("Error in tsv.VStream: %v", err) - t.Error(err) - } - }() - runCases(ctx, t, cases, eventCh) - - cancel() - - client := framework.NewClient() - client.Execute("drop table vitess_version", nil) - client.Execute("drop table _vt.schema_version", nil) -} - func runCases(ctx context.Context, t *testing.T, tests []test, eventCh chan []*binlogdatapb.VEvent) { client := framework.NewClient() diff --git a/go/vt/vttablet/tabletserver/schema/tracker.go b/go/vt/vttablet/tabletserver/schema/tracker.go index ea9abaf0221..35abb422ca0 100644 --- a/go/vt/vttablet/tabletserver/schema/tracker.go +++ b/go/vt/vttablet/tabletserver/schema/tracker.go @@ -23,6 +23,8 @@ import ( "sync" "time" + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/vt/sqlparser" "github.com/gogo/protobuf/proto" @@ -97,6 +99,7 @@ func (tr *Tracker) Open() { tr.cancel = cancel tr.wg.Add(1) log.Info("Schema tracker enabled.") + go tr.process(ctx) } @@ -130,6 +133,10 @@ func (tr *Tracker) Enable(enabled bool) { func (tr *Tracker) process(ctx context.Context) { defer tr.env.LogError() defer tr.wg.Done() + if err := tr.possiblyInsertInitialSchema(ctx); err != nil { + log.Errorf("possiblyInsertInitialSchema eror: %v", err) + return + } filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ @@ -163,14 +170,69 @@ func (tr *Tracker) process(ctx context.Context) { } } +func (tr *Tracker) currentPosition(ctx context.Context) (mysql.Position, error) { + conn, err := tr.engine.cp.Connect(ctx) + if err != nil { + return mysql.Position{}, err + } + defer conn.Close() + return conn.MasterPosition() +} + +func (tr *Tracker) isSchemaVersionTableEmpty(ctx context.Context) (bool, error) { + conn, err := tr.engine.GetConnection(ctx) + if err != nil { + return false, err + } + defer conn.Recycle() + result, err := withDDL.Exec(ctx, "select id from _vt.schema_version limit 1", conn.Exec) + if err != nil { + return false, err + } + if len(result.Rows) == 0 { + return true, nil + } + return false, nil +} + +// possiblyInsertInitialSchema stores the latest schema when a tracker starts and the schema_version table is empty +// this enables the right schema to be available between the time the tracker starts first and the first DDL is applied +func (tr *Tracker) possiblyInsertInitialSchema(ctx context.Context) error { + var err error + needsWarming, err := tr.isSchemaVersionTableEmpty(ctx) + if err != nil { + return err + } + if !needsWarming { // _vt.schema_version is not empty, nothing to do here + return nil + } + if err = tr.engine.Reload(ctx); err != nil { + return err + } + + timestamp := time.Now().UnixNano() / 1e9 + ddl := "" + pos, err := tr.currentPosition(ctx) + if err != nil { + return err + } + gtid := mysql.EncodePosition(pos) + log.Infof("Saving initial schema for gtid %s", gtid) + + return tr.saveCurrentSchemaToDb(ctx, gtid, ddl, timestamp) +} + func (tr *Tracker) schemaUpdated(gtid string, ddl string, timestamp int64) error { log.Infof("Processing schemaUpdated event for gtid %s, ddl %s", gtid, ddl) if gtid == "" || ddl == "" { return fmt.Errorf("got invalid gtid or ddl in schemaUpdated") } ctx := context.Background() - // Engine will have reloaded the schema because vstream will reload it on a DDL + return tr.saveCurrentSchemaToDb(ctx, gtid, ddl, timestamp) +} + +func (tr *Tracker) saveCurrentSchemaToDb(ctx context.Context, gtid, ddl string, timestamp int64) error { tables := tr.engine.GetSchema() dbSchema := &binlogdatapb.MinimalSchema{ Tables: []*binlogdatapb.MinimalTable{}, diff --git a/go/vt/vttablet/tabletserver/schema/tracker_test.go b/go/vt/vttablet/tabletserver/schema/tracker_test.go index 3bbe1d63ba4..5435b7fa5dd 100644 --- a/go/vt/vttablet/tabletserver/schema/tracker_test.go +++ b/go/vt/vttablet/tabletserver/schema/tracker_test.go @@ -19,7 +19,8 @@ package schema import ( "testing" - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/net/context" "vitess.io/vitess/go/sqltypes" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" @@ -27,16 +28,26 @@ import ( ) func TestTracker(t *testing.T) { + initialSchemaInserted := false se, db, cancel := getTestSchemaEngine(t) defer cancel() - gtid1 := "MySQL56/7b04699f-f5e9-11e9-bf88-9cb6d089e1c3:1-10" ddl1 := "create table tracker_test (id int)" query := "CREATE TABLE IF NOT EXISTS _vt.schema_version.*" db.AddQueryPattern(query, &sqltypes.Result{}) - db.AddQueryPattern("insert into _vt.schema_version.*", &sqltypes.Result{}) - + db.AddQueryPattern("insert into _vt.schema_version.*1-10.*", &sqltypes.Result{}) + db.AddQueryPatternWithCallback("insert into _vt.schema_version.*1-3.*", &sqltypes.Result{}, func(query string) { + initialSchemaInserted = true + }) + // simulates empty schema_version table, so initial schema should be inserted + db.AddQuery("select id from _vt.schema_version limit 1", &sqltypes.Result{Rows: [][]sqltypes.Value{}}) + // called to get current position + db.AddQuery("SELECT @@GLOBAL.gtid_executed", sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "", + "varchar"), + "7b04699f-f5e9-11e9-bf88-9cb6d089e1c3:1-3", + )) vs := &fakeVstreamer{ done: make(chan struct{}), events: [][]*binlogdatapb.VEvent{{ @@ -74,7 +85,49 @@ func TestTracker(t *testing.T) { tracker.Close() // Two of those events should have caused an error. final := env.Stats().ErrorCounters.Counts()["INTERNAL"] - assert.Equal(t, initial+2, final) + require.Equal(t, initial+2, final) + require.True(t, initialSchemaInserted) +} + +func TestTrackerShouldNotInsertInitialSchema(t *testing.T) { + initialSchemaInserted := false + se, db, cancel := getTestSchemaEngine(t) + gtid1 := "MySQL56/7b04699f-f5e9-11e9-bf88-9cb6d089e1c3:1-10" + + defer cancel() + // simulates existing rows in schema_version, so initial schema should not be inserted + db.AddQuery("select id from _vt.schema_version limit 1", sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "id", + "int"), + "1", + )) + // called to get current position + db.AddQuery("SELECT @@GLOBAL.gtid_executed", sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "", + "varchar"), + "7b04699f-f5e9-11e9-bf88-9cb6d089e1c3:1-3", + )) + db.AddQueryPatternWithCallback("insert into _vt.schema_version.*1-3.*", &sqltypes.Result{}, func(query string) { + initialSchemaInserted = true + }) + vs := &fakeVstreamer{ + done: make(chan struct{}), + events: [][]*binlogdatapb.VEvent{{ + { + Type: binlogdatapb.VEventType_GTID, + Gtid: gtid1, + }, + }}, + } + config := se.env.Config() + config.TrackSchemaVersions = true + env := tabletenv.NewEnv(config, "TrackerTest") + tracker := NewTracker(env, vs, se) + tracker.Open() + <-vs.done + cancel() + tracker.Close() + require.False(t, initialSchemaInserted) } var _ VStreamer = (*fakeVstreamer)(nil)