Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 35 additions & 19 deletions go/vt/vttablet/tabletmanager/vreplication/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"time"

"golang.org/x/net/context"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
Expand All @@ -51,8 +50,7 @@ type Engine struct {
isOpen bool
controllers map[int]*controller
// wg is used by in-flight functions that can run for long periods.
wg sync.WaitGroup
mustCreate bool
wg sync.WaitGroup

// ctx is the root context for all controllers.
ctx context.Context
Expand Down Expand Up @@ -100,6 +98,34 @@ func (vre *Engine) Open(ctx context.Context) error {
return nil
}

// executeFetchMaybeCreateTable calls DBClient.ExecuteFetch and does one retry if
// there's a failure due to mysql.ERNoSuchTable or mysql.ERBadDb which can be fixed
// by re-creating the _vt.vreplication table.
func (vre *Engine) executeFetchMaybeCreateTable(dbClient binlogplayer.DBClient, query string, maxrows int) (qr *sqltypes.Result, err error) {
qr, err = dbClient.ExecuteFetch(query, maxrows)

if err == nil {
return
}

// If it's a bad table or db, it could be because _vt.vreplication wasn't created.
// In that case we can try creating it again.
merr, isSQLErr := err.(*mysql.SQLError)
if !isSQLErr || !(merr.Num == mysql.ERNoSuchTable || merr.Num == mysql.ERBadDb) {
return qr, err
}

log.Info("Looks like _vt.vreplication table may not exist. Trying to recreate... ")
for _, query := range binlogplayer.CreateVReplicationTable() {
if _, merr := dbClient.ExecuteFetch(query, 0); merr != nil {
log.Warningf("Failed to ensure _vt.vreplication table exists: %v", merr)
return nil, err
}
}

return dbClient.ExecuteFetch(query, maxrows)
}

func (vre *Engine) initAll() error {
dbClient := vre.dbClientFactory()
if err := dbClient.Connect(); err != nil {
Expand All @@ -110,8 +136,7 @@ func (vre *Engine) initAll() error {
rows, err := readAllRows(dbClient)
if err != nil {
// Handle Table not found.
if merr, ok := err.(*mysql.SQLError); ok && merr.Num == 1146 {
vre.mustCreate = true
if merr, ok := err.(*mysql.SQLError); ok && merr.Num == mysql.ERNoSuchTable {
log.Info("_vt.vreplication table not found. Will create it later if needed")
return nil
}
Expand Down Expand Up @@ -185,25 +210,16 @@ func (vre *Engine) Exec(query string) (*sqltypes.Result, error) {
}
defer dbClient.Close()

if vre.mustCreate {
for _, query := range binlogplayer.CreateVReplicationTable() {
if _, err := dbClient.ExecuteFetch(query, 0); err != nil {
return nil, err
}
}
vre.mustCreate = false
}

// Change the database to ensure that these events don't get
// replicated by another vreplication. This can happen when
// we reverse replication.
if _, err := dbClient.ExecuteFetch("use _vt", 1); err != nil {
if _, err := vre.executeFetchMaybeCreateTable(dbClient, "use _vt", 1); err != nil {
return nil, err
}

switch plan.opcode {
case insertQuery:
qr, err := dbClient.ExecuteFetch(plan.query, 1)
qr, err := vre.executeFetchMaybeCreateTable(dbClient, plan.query, 1)
if err != nil {
return nil, err
}
Expand All @@ -228,7 +244,7 @@ func (vre *Engine) Exec(query string) (*sqltypes.Result, error) {
ct.Stop()
blpStats = ct.blpStats
}
qr, err := dbClient.ExecuteFetch(plan.query, 1)
qr, err := vre.executeFetchMaybeCreateTable(dbClient, plan.query, 1)
if err != nil {
return nil, err
}
Expand All @@ -250,10 +266,10 @@ func (vre *Engine) Exec(query string) (*sqltypes.Result, error) {
ct.Stop()
delete(vre.controllers, plan.id)
}
return dbClient.ExecuteFetch(plan.query, 1)
return vre.executeFetchMaybeCreateTable(dbClient, plan.query, 1)
case selectQuery:
// select queries are passed through.
return dbClient.ExecuteFetch(plan.query, 10000)
return vre.executeFetchMaybeCreateTable(dbClient, plan.query, 10000)
}
panic("unreachable")
}
Expand Down
31 changes: 28 additions & 3 deletions go/vt/vttablet/tabletmanager/vreplication/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/mysqlctl/fakemysqldaemon"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

Expand Down Expand Up @@ -395,18 +394,39 @@ func TestCreateDBAndTable(t *testing.T) {

vre := NewEngine(ts, testCell, mysqld, dbClientFactory)

notFound := mysql.SQLError{Num: 1146, Message: "not found"}
dbClient.ExpectRequest("select * from _vt.vreplication", nil, &notFound)
tableNotFound := mysql.SQLError{Num: 1146, Message: "table not found"}
dbClient.ExpectRequest("select * from _vt.vreplication", nil, &tableNotFound)
if err := vre.Open(context.Background()); err != nil {
t.Fatal(err)
}
defer vre.Close()

// Missing db. Statement should get retried after creating everything.
dbNotFound := mysql.SQLError{Num: 1049, Message: "db not found"}
dbClient.ExpectRequest("use _vt", &sqltypes.Result{}, &dbNotFound)

dbClient.ExpectRequest("CREATE DATABASE IF NOT EXISTS _vt", &sqltypes.Result{}, nil)
dbClient.ExpectRequest("DROP TABLE IF EXISTS _vt.blp_checkpoint", &sqltypes.Result{}, nil)
dbClient.ExpectRequestRE("CREATE TABLE IF NOT EXISTS _vt.vreplication.*", &sqltypes.Result{}, nil)

dbClient.ExpectRequest("use _vt", &sqltypes.Result{}, nil)

// Non-recoverable error.
unrecoverableError := &mysql.SQLError{Num: 1234, Message: "random error"}
dbClient.ExpectRequest("select fail_query from _vt.vreplication", &sqltypes.Result{}, unrecoverableError)

// Missing table. Statement should get retried after creating everything.
dbClient.ExpectRequest("use _vt", &sqltypes.Result{}, nil)
dbClient.ExpectRequest("insert into _vt.vreplication values (null)", &sqltypes.Result{}, &tableNotFound)

dbClient.ExpectRequest("CREATE DATABASE IF NOT EXISTS _vt", &sqltypes.Result{}, nil)
dbClient.ExpectRequest("DROP TABLE IF EXISTS _vt.blp_checkpoint", &sqltypes.Result{}, nil)
dbClient.ExpectRequestRE("CREATE TABLE IF NOT EXISTS _vt.vreplication.*", &sqltypes.Result{}, nil)

dbClient.ExpectRequest("insert into _vt.vreplication values (null)", &sqltypes.Result{InsertID: 1}, nil)

// The rest of this test is normal with no db errors or extra queries.

dbClient.ExpectRequest("select * from _vt.vreplication where id = 1", sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"id|state|source",
Expand All @@ -421,6 +441,11 @@ func TestCreateDBAndTable(t *testing.T) {
dbClient.ExpectRequestRE("update _vt.vreplication set pos='MariaDB/0-1-1235', time_updated=.*", testDMLResponse, nil)
dbClient.ExpectRequest("commit", nil, nil)

_, err := vre.Exec("select fail_query from _vt.vreplication")
if err != unrecoverableError {
t.Errorf("Want: %v, Got: %v", unrecoverableError, err)
}

qr, err := vre.Exec("insert into _vt.vreplication values(null)")
if err != nil {
t.Fatal(err)
Expand Down