Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions config/init_db.sql
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@ CREATE DATABASE IF NOT EXISTS _vt;
CREATE TABLE IF NOT EXISTS _vt.local_metadata (
name VARCHAR(255) NOT NULL,
value VARCHAR(255) NOT NULL,
PRIMARY KEY (name)
db_name VARBINARY(255) NOT NULL,
PRIMARY KEY (db_name, name)
) ENGINE=InnoDB;
CREATE TABLE IF NOT EXISTS _vt.shard_metadata (
name VARCHAR(255) NOT NULL,
value MEDIUMBLOB NOT NULL,
PRIMARY KEY (name)
db_name VARBINARY(255) NOT NULL,
PRIMARY KEY (db_name, name)
) ENGINE=InnoDB;

# Admin user with all privileges.
Expand Down
25 changes: 16 additions & 9 deletions go/vt/binlog/binlogplayer/binlog_player.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,8 +503,15 @@ func CreateVReplicationTable() []string {
transaction_timestamp BIGINT(20) NOT NULL,
state VARBINARY(100) NOT NULL,
message VARBINARY(1000) DEFAULT NULL,
db_name VARBINARY(255) NOT NULL,
PRIMARY KEY (id)
) ENGINE=InnoDB`}
) ENGINE=InnoDB`,
}
}

// AlterVReplicationTable adds new columns to vreplication table
func AlterVReplicationTable() []string {
return []string{"ALTER TABLE _vt.vreplication ADD COLUMN db_name VARBINARY(255) NOT NULL"}
}

// SetVReplicationState updates the state in the _vt.vreplication table.
Expand Down Expand Up @@ -558,19 +565,19 @@ func ReadVRSettings(dbClient DBClient, uid uint32) (VRSettings, error) {

// CreateVReplication returns a statement to populate the first value into
// the _vt.vreplication table.
func CreateVReplication(workflow string, source *binlogdatapb.BinlogSource, position string, maxTPS, maxReplicationLag, timeUpdated int64) string {
func CreateVReplication(workflow string, source *binlogdatapb.BinlogSource, position string, maxTPS, maxReplicationLag, timeUpdated int64, dbName string) string {
return fmt.Sprintf("insert into _vt.vreplication "+
"(workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state) "+
"values (%v, %v, %v, %v, %v, %v, 0, '%v')",
encodeString(workflow), encodeString(source.String()), encodeString(position), maxTPS, maxReplicationLag, timeUpdated, BlpRunning)
"(workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state, db_name) "+
"values (%v, %v, %v, %v, %v, %v, 0, '%v', %v)",
encodeString(workflow), encodeString(source.String()), encodeString(position), maxTPS, maxReplicationLag, timeUpdated, BlpRunning, encodeString(dbName))
}

// CreateVReplicationState returns a statement to create a stopped vreplication.
func CreateVReplicationState(workflow string, source *binlogdatapb.BinlogSource, position, state string) string {
func CreateVReplicationState(workflow string, source *binlogdatapb.BinlogSource, position, state string, dbName string) string {
return fmt.Sprintf("insert into _vt.vreplication "+
"(workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state) "+
"values (%v, %v, %v, %v, %v, %v, 0, '%v')",
encodeString(workflow), encodeString(source.String()), encodeString(position), throttler.MaxRateModuleDisabled, throttler.ReplicationLagModuleDisabled, time.Now().Unix(), state)
"(workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state, db_name) "+
"values (%v, %v, %v, %v, %v, %v, 0, '%v', %v)",
encodeString(workflow), encodeString(source.String()), encodeString(position), throttler.MaxRateModuleDisabled, throttler.ReplicationLagModuleDisabled, time.Now().Unix(), state, encodeString(dbName))
}

// GenerateUpdatePos returns a statement to update a value in the
Expand Down
12 changes: 6 additions & 6 deletions go/vt/binlog/binlogplayer/binlog_player_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,8 +320,8 @@ func applyEvents(blp *BinlogPlayer) func() error {

func TestCreateVReplicationKeyRange(t *testing.T) {
want := "insert into _vt.vreplication " +
"(workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state) " +
`values ('Resharding', 'keyspace:\"ks\" shard:\"0\" key_range:<end:\"\\200\" > ', 'MariaDB/0-1-1083', 9223372036854775807, 9223372036854775807, 481823, 0, 'Running')`
"(workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state, db_name) " +
`values ('Resharding', 'keyspace:\"ks\" shard:\"0\" key_range:<end:\"\\200\" > ', 'MariaDB/0-1-1083', 9223372036854775807, 9223372036854775807, 481823, 0, 'Running', 'db')`

bls := binlogdatapb.BinlogSource{
Keyspace: "ks",
Expand All @@ -331,24 +331,24 @@ func TestCreateVReplicationKeyRange(t *testing.T) {
},
}

got := CreateVReplication("Resharding", &bls, "MariaDB/0-1-1083", throttler.MaxRateModuleDisabled, throttler.ReplicationLagModuleDisabled, 481823)
got := CreateVReplication("Resharding", &bls, "MariaDB/0-1-1083", throttler.MaxRateModuleDisabled, throttler.ReplicationLagModuleDisabled, 481823, "db")
if got != want {
t.Errorf("CreateVReplication() =\n%v, want\n%v", got, want)
}
}

func TestCreateVReplicationTables(t *testing.T) {
want := "insert into _vt.vreplication " +
"(workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state) " +
`values ('Resharding', 'keyspace:\"ks\" shard:\"0\" tables:\"a\" tables:\"b\" ', 'MariaDB/0-1-1083', 9223372036854775807, 9223372036854775807, 481823, 0, 'Running')`
"(workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state, db_name) " +
`values ('Resharding', 'keyspace:\"ks\" shard:\"0\" tables:\"a\" tables:\"b\" ', 'MariaDB/0-1-1083', 9223372036854775807, 9223372036854775807, 481823, 0, 'Running', 'db')`

bls := binlogdatapb.BinlogSource{
Keyspace: "ks",
Shard: "0",
Tables: []string{"a", "b"},
}

got := CreateVReplication("Resharding", &bls, "MariaDB/0-1-1083", throttler.MaxRateModuleDisabled, throttler.ReplicationLagModuleDisabled, 481823)
got := CreateVReplication("Resharding", &bls, "MariaDB/0-1-1083", throttler.MaxRateModuleDisabled, throttler.ReplicationLagModuleDisabled, 481823, "db")
if got != want {
t.Errorf("CreateVReplication() =\n%v, want\n%v", got, want)
}
Expand Down
6 changes: 3 additions & 3 deletions go/vt/mysqlctl/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func Restore(
}
if !ok {
logger.Infof("Auto-restore is enabled, but mysqld already contains data. Assuming vttablet was just restarted.")
if err = PopulateMetadataTables(mysqld, localMetadata); err == nil {
if err = PopulateMetadataTables(mysqld, localMetadata, dbName); err == nil {
err = ErrExistingDB
}
return mysql.Position{}, err
Expand Down Expand Up @@ -263,7 +263,7 @@ func Restore(
err = ErrNoBackup
}

if err2 := PopulateMetadataTables(mysqld, localMetadata); err2 == nil {
if err2 := PopulateMetadataTables(mysqld, localMetadata, dbName); err2 == nil {
err = ErrNoBackup
}
return mysql.Position{}, err
Expand Down Expand Up @@ -299,7 +299,7 @@ func Restore(
// Populate local_metadata before starting without --skip-networking,
// so it's there before we start announcing ourselves.
logger.Infof("Restore: populating local_metadata")
err = PopulateMetadataTables(mysqld, localMetadata)
err = PopulateMetadataTables(mysqld, localMetadata, dbName)
if err != nil {
return mysql.Position{}, err
}
Expand Down
54 changes: 50 additions & 4 deletions go/vt/mysqlctl/metadata_tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,40 @@ package mysqlctl

import (
"bytes"
"fmt"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/log"
)

// Note that definitions of local_metadata and shard_metadata should be the same
// as in testing which is defined in config/init_db.sql.
const sqlCreateLocalMetadataTable = `CREATE TABLE IF NOT EXISTS _vt.local_metadata (
const (
sqlCreateLocalMetadataTable = `CREATE TABLE IF NOT EXISTS _vt.local_metadata (
name VARCHAR(255) NOT NULL,
value VARCHAR(255) NOT NULL,
PRIMARY KEY (name)
) ENGINE=InnoDB`
const sqlCreateShardMetadataTable = `CREATE TABLE IF NOT EXISTS _vt.shard_metadata (
sqlCreateShardMetadataTable = `CREATE TABLE IF NOT EXISTS _vt.shard_metadata (
name VARCHAR(255) NOT NULL,
value MEDIUMBLOB NOT NULL,
PRIMARY KEY (name)
) ENGINE=InnoDB`
sqlUpdateLocalMetadataTable = "UPDATE _vt.local_metadata SET db_name='%s' WHERE db_name=''"
sqlUpdateShardMetadataTable = "UPDATE _vt.shard_metadata SET db_name='%s' WHERE db_name=''"
)

var (
sqlAlterLocalMetadataTable = []string{
`ALTER TABLE _vt.local_metadata ADD COLUMN db_name VARBINARY(255) NOT NULL`,
`ALTER TABLE _vt.local_metadata DROP PRIMARY KEY, ADD PRIMARY KEY(name, db_name)`,
}
sqlAlterShardMetadataTable = []string{
`ALTER TABLE _vt.shard_metadata ADD COLUMN db_name VARBINARY(255) NOT NULL`,
`ALTER TABLE _vt.shard_metadata DROP PRIMARY KEY, ADD PRIMARY KEY(name, db_name)`,
}
)

// PopulateMetadataTables creates and fills the _vt.local_metadata table and
// creates _vt.shard_metadata table. _vt.local_metadata table is
Expand All @@ -46,7 +63,7 @@ const sqlCreateShardMetadataTable = `CREATE TABLE IF NOT EXISTS _vt.shard_metada
// created here to make it easier to create it on databases that were running
// old version of Vitess, or databases that are getting converted to run under
// Vitess.
func PopulateMetadataTables(mysqld MysqlDaemon, localMetadata map[string]string) error {
func PopulateMetadataTables(mysqld MysqlDaemon, localMetadata map[string]string, dbName string) error {
log.Infof("Populating _vt.local_metadata table...")

// Get a non-pooled DBA connection.
Expand All @@ -69,9 +86,35 @@ func PopulateMetadataTables(mysqld MysqlDaemon, localMetadata map[string]string)
if _, err := conn.ExecuteFetch(sqlCreateLocalMetadataTable, 0, false); err != nil {
return err
}
for _, sql := range sqlAlterLocalMetadataTable {
if _, err := conn.ExecuteFetch(sql, 0, false); err != nil {
if merr, ok := err.(*mysql.SQLError); ok && merr.Num == mysql.ERDupFieldName {
log.Errorf("Expected error executing %v: %v", sql, err)
} else {
log.Errorf("Unexpected error executing %v: %v", sql, err)
return err
}
}
}
if _, err := conn.ExecuteFetch(fmt.Sprintf(sqlUpdateLocalMetadataTable, dbName), 0, false); err != nil {
return err
}
if _, err := conn.ExecuteFetch(sqlCreateShardMetadataTable, 0, false); err != nil {
return err
}
for _, sql := range sqlAlterShardMetadataTable {
if _, err := conn.ExecuteFetch(sql, 0, false); err != nil {
if merr, ok := err.(*mysql.SQLError); ok && merr.Num == mysql.ERDupFieldName {
log.Errorf("Expected error executing %v: %v", sql, err)
} else {
log.Errorf("Unexpected error executing %v: %v", sql, err)
return err
}
}
}
if _, err := conn.ExecuteFetch(fmt.Sprintf(sqlUpdateShardMetadataTable, dbName), 0, false); err != nil {
return err
}

// Populate local_metadata from the passed list of values.
if _, err := conn.ExecuteFetch("BEGIN", 0, false); err != nil {
Expand All @@ -80,12 +123,15 @@ func PopulateMetadataTables(mysqld MysqlDaemon, localMetadata map[string]string)
for name, val := range localMetadata {
nameValue := sqltypes.NewVarChar(name)
valValue := sqltypes.NewVarChar(val)
dbNameValue := sqltypes.NewVarBinary(dbName)

queryBuf := bytes.Buffer{}
queryBuf.WriteString("INSERT INTO _vt.local_metadata (name,value) VALUES (")
queryBuf.WriteString("INSERT INTO _vt.local_metadata (name,value, db_name) VALUES (")
nameValue.EncodeSQL(&queryBuf)
queryBuf.WriteByte(',')
valValue.EncodeSQL(&queryBuf)
queryBuf.WriteByte(',')
dbNameValue.EncodeSQL(&queryBuf)
queryBuf.WriteString(") ON DUPLICATE KEY UPDATE value = ")
valValue.EncodeSQL(&queryBuf)

Expand Down
21 changes: 12 additions & 9 deletions go/vt/schemamanager/schemaswap/schema_swap.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
workflowpb "vitess.io/vitess/go/vt/proto/workflow"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vtctl"
"vitess.io/vitess/go/vt/vttablet/tmclient"
"vitess.io/vitess/go/vt/workflow"
Expand Down Expand Up @@ -603,8 +604,8 @@ func (shardSwap *shardSchemaSwap) readShardMetadata(metadata *shardSwapMetadata,
return
}
query := fmt.Sprintf(
"SELECT name, value FROM _vt.shard_metadata WHERE name in ('%s', '%s', '%s')",
lastStartedMetadataName, lastFinishedMetadataName, currentSQLMetadataName)
"SELECT name, value FROM _vt.shard_metadata WHERE db_name = '%s' and name in ('%s', '%s', '%s')",
topoproto.TabletDbName(tablet), lastStartedMetadataName, lastFinishedMetadataName, currentSQLMetadataName)
queryResult, err := shardSwap.executeAdminQuery(tablet, query, 3 /* maxRows */)
if err != nil {
metadata.err = err
Expand Down Expand Up @@ -640,7 +641,9 @@ func (shardSwap *shardSchemaSwap) writeStartedSwap() error {
return err
}
queryBuf := bytes.Buffer{}
queryBuf.WriteString("INSERT INTO _vt.shard_metadata (name, value) VALUES ('")
queryBuf.WriteString("INSERT INTO _vt.shard_metadata (db_name, name, value) VALUES ('")
queryBuf.WriteString(topoproto.TabletDbName(tablet))
queryBuf.WriteString("',")
queryBuf.WriteString(currentSQLMetadataName)
queryBuf.WriteString("',")
sqlValue := sqltypes.NewVarChar(shardSwap.parent.sql)
Expand All @@ -666,13 +669,13 @@ func (shardSwap *shardSchemaSwap) writeFinishedSwap() error {
return err
}
query := fmt.Sprintf(
"INSERT INTO _vt.shard_metadata (name, value) VALUES ('%s', '%d') ON DUPLICATE KEY UPDATE value = '%d'",
lastFinishedMetadataName, shardSwap.parent.swapID, shardSwap.parent.swapID)
"INSERT INTO _vt.shard_metadata (db_name, name, value) VALUES ('%s', '%s', '%d') ON DUPLICATE KEY UPDATE value = '%d'",
topoproto.TabletDbName(tablet), lastFinishedMetadataName, shardSwap.parent.swapID, shardSwap.parent.swapID)
_, err = shardSwap.executeAdminQuery(tablet, query, 0 /* maxRows */)
if err != nil {
return err
}
query = fmt.Sprintf("DELETE FROM _vt.shard_metadata WHERE name = '%s'", currentSQLMetadataName)
query = fmt.Sprintf("DELETE FROM _vt.shard_metadata WHERE db_name = '%s' AND name = '%s'", topoproto.TabletDbName(tablet), currentSQLMetadataName)
_, err = shardSwap.executeAdminQuery(tablet, query, 0 /* maxRows */)
return err
}
Expand Down Expand Up @@ -896,7 +899,7 @@ func (shardSwap *shardSchemaSwap) executeAdminQuery(tablet *topodatapb.Tablet, q
func (shardSwap *shardSchemaSwap) isSwapApplied(tablet *topodatapb.Tablet) (bool, error) {
swapIDResult, err := shardSwap.executeAdminQuery(
tablet,
fmt.Sprintf("SELECT value FROM _vt.local_metadata WHERE name = '%s'", lastAppliedMetadataName),
fmt.Sprintf("SELECT value FROM _vt.local_metadata WHERE db_name = '%s' AND name = '%s'", topoproto.TabletDbName(tablet), lastAppliedMetadataName),
1 /* maxRows */)
if err != nil {
return false, err
Expand Down Expand Up @@ -1036,8 +1039,8 @@ func (shardSwap *shardSchemaSwap) applySeedSchemaChange() (err error) {
return err
}
updateAppliedSwapQuery := fmt.Sprintf(
"INSERT INTO _vt.local_metadata (name, value) VALUES ('%s', '%d') ON DUPLICATE KEY UPDATE value = '%d'",
lastAppliedMetadataName, shardSwap.parent.swapID, shardSwap.parent.swapID)
"INSERT INTO _vt.local_metadata (db_name, name, value) VALUES ('%s', '%s', '%d') ON DUPLICATE KEY UPDATE value = '%d'",
topoproto.TabletDbName(seedTablet), lastAppliedMetadataName, shardSwap.parent.swapID, shardSwap.parent.swapID)
_, err = shardSwap.parent.tabletClient.ExecuteFetchAsDba(
shardSwap.parent.ctx,
seedTablet,
Expand Down
21 changes: 12 additions & 9 deletions go/vt/vttablet/tabletmanager/action_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,13 +263,6 @@ func NewActionAgent(
agent.statsTabletType = stats.NewString("TabletType")
agent.statsTabletTypeCount = stats.NewCountersWithSingleLabel("TabletTypeCount", "Number of times the tablet changed to the labeled type", "type")

// The db name will get set by the Start function called below, before
// VREngine gets to invoke the FilteredWithDB call.
agent.VREngine = vreplication.NewEngine(ts, tabletAlias.Cell, mysqld, func() binlogplayer.DBClient {
return binlogplayer.NewDBClient(agent.DBConfigs.FilteredWithDB())
})
servenv.OnTerm(agent.VREngine.Close)

var mysqlHost string
var mysqlPort int32
if appConfig := dbcfgs.AppWithDB(); appConfig.Host != "" {
Expand All @@ -289,6 +282,12 @@ func NewActionAgent(
return nil, err
}

// The db name is set by the Start function called above
agent.VREngine = vreplication.NewEngine(ts, tabletAlias.Cell, mysqld, func() binlogplayer.DBClient {
return binlogplayer.NewDBClient(agent.DBConfigs.FilteredWithDB())
}, agent.DBConfigs.FilteredWithDB().DbName)
servenv.OnTerm(agent.VREngine.Close)

// Run a background task to rebuild the SrvKeyspace in our cell/keyspace
// if it doesn't exist yet.
go agent.maybeRebuildKeyspace(agent.initialTablet.Alias.Cell, agent.initialTablet.Keyspace)
Expand Down Expand Up @@ -340,6 +339,10 @@ func NewActionAgent(
// NewTestActionAgent creates an agent for test purposes. Only a
// subset of features are supported now, but we'll add more over time.
func NewTestActionAgent(batchCtx context.Context, ts *topo.Server, tabletAlias *topodatapb.TabletAlias, vtPort, grpcPort int32, mysqlDaemon mysqlctl.MysqlDaemon, preStart func(*ActionAgent)) *ActionAgent {
ti, err := ts.GetTablet(batchCtx, tabletAlias)
if err != nil {
panic(vterrors.Wrap(err, "failed reading tablet"))
}
agent := &ActionAgent{
QueryServiceControl: tabletservermock.NewController(),
UpdateStream: binlog.NewUpdateStreamControlMock(),
Expand All @@ -350,7 +353,7 @@ func NewTestActionAgent(batchCtx context.Context, ts *topo.Server, tabletAlias *
Cnf: nil,
MysqlDaemon: mysqlDaemon,
DBConfigs: &dbconfigs.DBConfigs{},
VREngine: vreplication.NewEngine(ts, tabletAlias.Cell, mysqlDaemon, binlogplayer.NewFakeDBClient),
VREngine: vreplication.NewEngine(ts, tabletAlias.Cell, mysqlDaemon, binlogplayer.NewFakeDBClient, ti.DbName()),
History: history.New(historyLength),
_healthy: fmt.Errorf("healthcheck not run yet"),
}
Expand Down Expand Up @@ -389,7 +392,7 @@ func NewComboActionAgent(batchCtx context.Context, ts *topo.Server, tabletAlias
Cnf: nil,
MysqlDaemon: mysqlDaemon,
DBConfigs: dbcfgs,
VREngine: vreplication.NewEngine(nil, "", nil, nil),
VREngine: vreplication.NewEngine(nil, "", nil, nil, ""),
gotMysqlPort: true,
History: history.New(historyLength),
_healthy: fmt.Errorf("healthcheck not run yet"),
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/init_tablet.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (agent *ActionAgent) InitTablet(port, gRPCPort int32) error {
if *initPopulateMetadata {
agent.setTablet(tablet)
localMetadata := agent.getLocalMetadataValues(tablet.Type)
err := mysqlctl.PopulateMetadataTables(agent.MysqlDaemon, localMetadata)
err := mysqlctl.PopulateMetadataTables(agent.MysqlDaemon, localMetadata, topoproto.TabletDbName(tablet))
if err != nil {
return vterrors.Wrap(err, "failed to -init_populate_metadata")
}
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletmanager/init_tablet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func TestInitTablet(t *testing.T) {
}
db := fakesqldb.New(t)
defer db.Close()
db.AddQueryPattern(`(SET|CREATE|BEGIN|INSERT|COMMIT)\b.*`, &sqltypes.Result{})
db.AddQueryPattern(`(SET|CREATE|BEGIN|INSERT|COMMIT|ALTER|UPDATE)\b.*`, &sqltypes.Result{})
/*
db.AddQuery("SET @@session.sql_log_bin = 0", &sqltypes.Result{})
db.AddQuery("CREATE DATABASE IF NOT EXISTS _vt", &sqltypes.Result{})
Expand All @@ -194,7 +194,7 @@ func TestInitTablet(t *testing.T) {
TabletAlias: tabletAlias,
MysqlDaemon: mysqlDaemon,
DBConfigs: &dbconfigs.DBConfigs{},
VREngine: vreplication.NewEngine(nil, "", nil, nil),
VREngine: vreplication.NewEngine(nil, "", nil, nil, ""),
batchCtx: ctx,
History: history.New(historyLength),
_healthy: fmt.Errorf("healthcheck not run yet"),
Expand Down
Loading