diff --git a/go/vt/vttablet/tabletmanager/vreplication/engine.go b/go/vt/vttablet/tabletmanager/vreplication/engine.go index 1b272473694..c499225cf4c 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/engine.go +++ b/go/vt/vttablet/tabletmanager/vreplication/engine.go @@ -24,7 +24,6 @@ import ( "time" "golang.org/x/net/context" - "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/binlog/binlogplayer" @@ -51,8 +50,7 @@ type Engine struct { isOpen bool controllers map[int]*controller // wg is used by in-flight functions that can run for long periods. - wg sync.WaitGroup - mustCreate bool + wg sync.WaitGroup // ctx is the root context for all controllers. ctx context.Context @@ -100,6 +98,34 @@ func (vre *Engine) Open(ctx context.Context) error { return nil } +// executeFetchMaybeCreateTable calls DBClient.ExecuteFetch and does one retry if +// there's a failure due to mysql.ERNoSuchTable or mysql.ERBadDb which can be fixed +// by re-creating the _vt.vreplication table. +func (vre *Engine) executeFetchMaybeCreateTable(dbClient binlogplayer.DBClient, query string, maxrows int) (qr *sqltypes.Result, err error) { + qr, err = dbClient.ExecuteFetch(query, maxrows) + + if err == nil { + return + } + + // If it's a bad table or db, it could be because _vt.vreplication wasn't created. + // In that case we can try creating it again. + merr, isSQLErr := err.(*mysql.SQLError) + if !isSQLErr || !(merr.Num == mysql.ERNoSuchTable || merr.Num == mysql.ERBadDb) { + return qr, err + } + + log.Info("Looks like _vt.vreplication table may not exist. Trying to recreate... ") + for _, query := range binlogplayer.CreateVReplicationTable() { + if _, merr := dbClient.ExecuteFetch(query, 0); merr != nil { + log.Warningf("Failed to ensure _vt.vreplication table exists: %v", merr) + return nil, err + } + } + + return dbClient.ExecuteFetch(query, maxrows) +} + func (vre *Engine) initAll() error { dbClient := vre.dbClientFactory() if err := dbClient.Connect(); err != nil { @@ -110,8 +136,7 @@ func (vre *Engine) initAll() error { rows, err := readAllRows(dbClient) if err != nil { // Handle Table not found. - if merr, ok := err.(*mysql.SQLError); ok && merr.Num == 1146 { - vre.mustCreate = true + if merr, ok := err.(*mysql.SQLError); ok && merr.Num == mysql.ERNoSuchTable { log.Info("_vt.vreplication table not found. Will create it later if needed") return nil } @@ -185,25 +210,16 @@ func (vre *Engine) Exec(query string) (*sqltypes.Result, error) { } defer dbClient.Close() - if vre.mustCreate { - for _, query := range binlogplayer.CreateVReplicationTable() { - if _, err := dbClient.ExecuteFetch(query, 0); err != nil { - return nil, err - } - } - vre.mustCreate = false - } - // Change the database to ensure that these events don't get // replicated by another vreplication. This can happen when // we reverse replication. - if _, err := dbClient.ExecuteFetch("use _vt", 1); err != nil { + if _, err := vre.executeFetchMaybeCreateTable(dbClient, "use _vt", 1); err != nil { return nil, err } switch plan.opcode { case insertQuery: - qr, err := dbClient.ExecuteFetch(plan.query, 1) + qr, err := vre.executeFetchMaybeCreateTable(dbClient, plan.query, 1) if err != nil { return nil, err } @@ -228,7 +244,7 @@ func (vre *Engine) Exec(query string) (*sqltypes.Result, error) { ct.Stop() blpStats = ct.blpStats } - qr, err := dbClient.ExecuteFetch(plan.query, 1) + qr, err := vre.executeFetchMaybeCreateTable(dbClient, plan.query, 1) if err != nil { return nil, err } @@ -250,10 +266,10 @@ func (vre *Engine) Exec(query string) (*sqltypes.Result, error) { ct.Stop() delete(vre.controllers, plan.id) } - return dbClient.ExecuteFetch(plan.query, 1) + return vre.executeFetchMaybeCreateTable(dbClient, plan.query, 1) case selectQuery: // select queries are passed through. - return dbClient.ExecuteFetch(plan.query, 10000) + return vre.executeFetchMaybeCreateTable(dbClient, plan.query, 10000) } panic("unreachable") } diff --git a/go/vt/vttablet/tabletmanager/vreplication/engine_test.go b/go/vt/vttablet/tabletmanager/vreplication/engine_test.go index 24c2140e5dd..f93ec5559e5 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/engine_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/engine_test.go @@ -26,7 +26,6 @@ import ( "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/mysqlctl/fakemysqldaemon" - topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) @@ -395,18 +394,39 @@ func TestCreateDBAndTable(t *testing.T) { vre := NewEngine(ts, testCell, mysqld, dbClientFactory) - notFound := mysql.SQLError{Num: 1146, Message: "not found"} - dbClient.ExpectRequest("select * from _vt.vreplication", nil, ¬Found) + tableNotFound := mysql.SQLError{Num: 1146, Message: "table not found"} + dbClient.ExpectRequest("select * from _vt.vreplication", nil, &tableNotFound) if err := vre.Open(context.Background()); err != nil { t.Fatal(err) } defer vre.Close() + // Missing db. Statement should get retried after creating everything. + dbNotFound := mysql.SQLError{Num: 1049, Message: "db not found"} + dbClient.ExpectRequest("use _vt", &sqltypes.Result{}, &dbNotFound) + dbClient.ExpectRequest("CREATE DATABASE IF NOT EXISTS _vt", &sqltypes.Result{}, nil) dbClient.ExpectRequest("DROP TABLE IF EXISTS _vt.blp_checkpoint", &sqltypes.Result{}, nil) dbClient.ExpectRequestRE("CREATE TABLE IF NOT EXISTS _vt.vreplication.*", &sqltypes.Result{}, nil) + + dbClient.ExpectRequest("use _vt", &sqltypes.Result{}, nil) + + // Non-recoverable error. + unrecoverableError := &mysql.SQLError{Num: 1234, Message: "random error"} + dbClient.ExpectRequest("select fail_query from _vt.vreplication", &sqltypes.Result{}, unrecoverableError) + + // Missing table. Statement should get retried after creating everything. dbClient.ExpectRequest("use _vt", &sqltypes.Result{}, nil) + dbClient.ExpectRequest("insert into _vt.vreplication values (null)", &sqltypes.Result{}, &tableNotFound) + + dbClient.ExpectRequest("CREATE DATABASE IF NOT EXISTS _vt", &sqltypes.Result{}, nil) + dbClient.ExpectRequest("DROP TABLE IF EXISTS _vt.blp_checkpoint", &sqltypes.Result{}, nil) + dbClient.ExpectRequestRE("CREATE TABLE IF NOT EXISTS _vt.vreplication.*", &sqltypes.Result{}, nil) + dbClient.ExpectRequest("insert into _vt.vreplication values (null)", &sqltypes.Result{InsertID: 1}, nil) + + // The rest of this test is normal with no db errors or extra queries. + dbClient.ExpectRequest("select * from _vt.vreplication where id = 1", sqltypes.MakeTestResult( sqltypes.MakeTestFields( "id|state|source", @@ -421,6 +441,11 @@ func TestCreateDBAndTable(t *testing.T) { dbClient.ExpectRequestRE("update _vt.vreplication set pos='MariaDB/0-1-1235', time_updated=.*", testDMLResponse, nil) dbClient.ExpectRequest("commit", nil, nil) + _, err := vre.Exec("select fail_query from _vt.vreplication") + if err != unrecoverableError { + t.Errorf("Want: %v, Got: %v", unrecoverableError, err) + } + qr, err := vre.Exec("insert into _vt.vreplication values(null)") if err != nil { t.Fatal(err)