Skip to content
This repository was archived by the owner on Dec 16, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go/cmd/vttablet/vttablet.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func main() {
QueryServiceControl: qsc,
UpdateStream: binlog.NewUpdateStream(ts, tablet.Keyspace, tabletAlias.Cell, qsc.SchemaEngine()),
VREngine: vreplication.NewEngine(config, ts, tabletAlias.Cell, mysqld),
MetadataManager: &mysqlctl.MetadataManager{},
}
if err := tm.Start(tablet, config.Healthcheck.IntervalSeconds.Get()); err != nil {
log.Exitf("failed to parse -tablet-path or initialize DB credentials: %v", err)
Expand Down
141 changes: 128 additions & 13 deletions go/vt/mysqlctl/metadata_tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/dbconnpool"
"vitess.io/vitess/go/vt/log"
)

Expand Down Expand Up @@ -58,20 +59,44 @@ var (
}
)

// PopulateMetadataTables creates and fills the _vt.local_metadata table and
// creates _vt.shard_metadata table. _vt.local_metadata table is
// a per-tablet table that is never replicated. This allows queries
// against local_metadata to return different values on different tablets,
// which is used for communicating between Vitess and MySQL-level tools like
// Orchestrator (https://github.com/openark/orchestrator).
// MetadataManager manages the creation and filling of the _vt.local_metadata
// and _vt.shard_metadata tables.
type MetadataManager struct{}

// CreateMetadataTables creates the metadata tables. See the package-level
// function for more details.
func (m *MetadataManager) CreateMetadataTables(mysqld MysqlDaemon, dbName string) error {
return CreateMetadataTables(mysqld, dbName)
}

// PopulateMetadataTables creates and fills the metadata tables. See the
// package-level function for more details.
func (m *MetadataManager) PopulateMetadataTables(mysqld MysqlDaemon, localMetadata map[string]string, dbName string) error {
return PopulateMetadataTables(mysqld, localMetadata, dbName)
}

// UpsertLocalMetadata adds the given metadata map to the _vt.local_metadata
// table, updating any duplicate rows to the values in the map. See the package-
// level function for more details.
func (m *MetadataManager) UpsertLocalMetadata(mysqld MysqlDaemon, localMetadata map[string]string, dbName string) error {
return UpsertLocalMetadata(mysqld, localMetadata, dbName)
}

// CreateMetadataTables creates the _vt.local_metadata and _vt.shard_metadata
// tables.
//
// _vt.local_metadata table is a per-tablet table that is never replicated.
// This allows queries against local_metadata to return different values on
// different tablets, which is used for communicating between Vitess and
// MySQL-level tools like Orchestrator (https://github.com/openark/orchestrator).
//
// _vt.shard_metadata is a replicated table with per-shard information, but it's
// created here to make it easier to create it on databases that were running
// old version of Vitess, or databases that are getting converted to run under
// Vitess.
func PopulateMetadataTables(mysqld MysqlDaemon, localMetadata map[string]string, dbName string) error {
log.Infof("Populating _vt.local_metadata table...")
func CreateMetadataTables(mysqld MysqlDaemon, dbName string) error {
log.Infof("Creating _vt.local_metadata and _vt.shard_metadata tables ...")

// Get a non-pooled DBA connection.
conn, err := mysqld.GetDbaConnection(context.TODO())
if err != nil {
return err
Expand All @@ -84,13 +109,30 @@ func PopulateMetadataTables(mysqld MysqlDaemon, localMetadata map[string]string,
return err
}

// Create the database and table if necessary.
return createMetadataTables(conn, dbName)
}

func createMetadataTables(conn *dbconnpool.DBConnection, dbName string) error {
if _, err := conn.ExecuteFetch("CREATE DATABASE IF NOT EXISTS _vt", 0, false); err != nil {
return err
}

if err := createLocalMetadataTable(conn, dbName); err != nil {
return err
}

if err := createShardMetadataTable(conn, dbName); err != nil {
return err
}

return nil
}

func createLocalMetadataTable(conn *dbconnpool.DBConnection, dbName string) error {
if _, err := conn.ExecuteFetch(sqlCreateLocalMetadataTable, 0, false); err != nil {
return err
}

for _, sql := range sqlAlterLocalMetadataTable {
if _, err := conn.ExecuteFetch(sql, 0, false); err != nil {
// Ignore "Duplicate column name 'db_name'" errors which can happen on every restart.
Expand All @@ -100,13 +142,20 @@ func PopulateMetadataTables(mysqld MysqlDaemon, localMetadata map[string]string,
}
}
}

sql := fmt.Sprintf(sqlUpdateLocalMetadataTable, dbName)
if _, err := conn.ExecuteFetch(sql, 0, false); err != nil {
log.Errorf("Error executing %v: %v, continuing. Please check the data in _vt.local_metadata and take corrective action.", sql, err)
}

return nil
}

func createShardMetadataTable(conn *dbconnpool.DBConnection, dbName string) error {
if _, err := conn.ExecuteFetch(sqlCreateShardMetadataTable, 0, false); err != nil {
return err
}

for _, sql := range sqlAlterShardMetadataTable {
if _, err := conn.ExecuteFetch(sql, 0, false); err != nil {
// Ignore "Duplicate column name 'db_name'" errors which can happen on every restart.
Expand All @@ -116,11 +165,73 @@ func PopulateMetadataTables(mysqld MysqlDaemon, localMetadata map[string]string,
}
}
}
sql = fmt.Sprintf(sqlUpdateShardMetadataTable, dbName)

sql := fmt.Sprintf(sqlUpdateShardMetadataTable, dbName)
if _, err := conn.ExecuteFetch(sql, 0, false); err != nil {
log.Errorf("Error executing %v: %v, continuing. Please check the data in _vt.shard_metadata and take corrective action.", sql, err)
}

return nil
}

// PopulateMetadataTables creates and fills the _vt.local_metadata table and
// creates _vt.shard_metadata table.
//
// This function is semantically equivalent to calling CreateMetadataTables
// followed immediately by UpsertLocalMetadata.
func PopulateMetadataTables(mysqld MysqlDaemon, localMetadata map[string]string, dbName string) error {
log.Infof("Populating _vt.local_metadata table...")

// Get a non-pooled DBA connection.
conn, err := mysqld.GetDbaConnection(context.TODO())
if err != nil {
return err
}
defer conn.Close()

// Disable replication on this session. We close the connection after using
// it, so there's no need to re-enable replication when we're done.
if _, err := conn.ExecuteFetch("SET @@session.sql_log_bin = 0", 0, false); err != nil {
return err
}

// Create the database and table if necessary.
if err := createMetadataTables(conn, dbName); err != nil {
return err
}

// Populate local_metadata from the passed list of values.
return upsertLocalMetadata(conn, localMetadata, dbName)
}

// UpsertLocalMetadata adds the given metadata map to the _vt.local_metadata
// table, updating any rows that exist for a given `_vt.local_metadata.name`
// with the map value. The session that performs these upserts sets
// sql_log_bin=0, as the _vt.local_metadata table is meant to never be
// replicated.
//
// Callers are responsible for ensuring the _vt.local_metadata table exists
// before calling this function, usually by calling CreateMetadataTables at
// least once prior.
func UpsertLocalMetadata(mysqld MysqlDaemon, localMetadata map[string]string, dbName string) error {
log.Infof("Upserting _vt.local_metadata ...")

conn, err := mysqld.GetDbaConnection(context.TODO())
if err != nil {
return err
}
defer conn.Close()

// Disable replication on this session. We close the connection after using
// it, so there's no need to re-enable replication when we're done.
if _, err := conn.ExecuteFetch("SET @@session.sql_log_bin = 0", 0, false); err != nil {
return err
}

return upsertLocalMetadata(conn, localMetadata, dbName)
}

func upsertLocalMetadata(conn *dbconnpool.DBConnection, localMetadata map[string]string, dbName string) error {
// Populate local_metadata from the passed list of values.
if _, err := conn.ExecuteFetch("BEGIN", 0, false); err != nil {
return err
Expand All @@ -144,6 +255,10 @@ func PopulateMetadataTables(mysqld MysqlDaemon, localMetadata map[string]string,
return err
}
}
_, err = conn.ExecuteFetch("COMMIT", 0, false)
return err

if _, err := conn.ExecuteFetch("COMMIT", 0, false); err != nil {
return err
}

return nil
}
20 changes: 14 additions & 6 deletions go/vt/vttablet/tabletmanager/tm_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ var (
// statsBackupIsRunning is set to 1 (true) if a backup is running.
statsBackupIsRunning *stats.GaugesWithMultiLabels

// statsIsInSRVKeyspace is set to 1 (true), 0 (false) whether the tablet is in the serving keyspace
statsIsInSRVKeyspace *stats.Gauge
// statsIsInSrvKeyspace is set to 1 (true), 0 (false) whether the tablet is in the serving keyspace
statsIsInSrvKeyspace *stats.Gauge

statsKeyspace = stats.NewString("TabletKeyspace")
statsShard = stats.NewString("TabletShard")
Expand All @@ -118,7 +118,7 @@ func init() {
statsTabletType = stats.NewString("TabletType")
statsTabletTypeCount = stats.NewCountersWithSingleLabel("TabletTypeCount", "Number of times the tablet changed to the labeled type", "type")
statsBackupIsRunning = stats.NewGaugesWithMultiLabels("BackupIsRunning", "Whether a backup is running", []string{"mode"})
statsIsInSRVKeyspace = stats.NewGauge("IsInSRVKeyspace", "Whether the vttablet is in the serving keyspace (1 = true / 0 = false)")
statsIsInSrvKeyspace = stats.NewGauge("IsInSRVKeyspace", "Whether the vttablet is in the serving keyspace (1 = true / 0 = false)")
}

// TabletManager is the main class for the tablet manager.
Expand All @@ -133,6 +133,11 @@ type TabletManager struct {
UpdateStream binlog.UpdateStreamControl
VREngine *vreplication.Engine

// MetadataManager manages the local metadata tables for a tablet. It
// exists, and is exported, to support swapping a nil pointer in test code,
// in which case metadata creation/population is skipped.
MetadataManager *mysqlctl.MetadataManager

// tmState manages the TabletManager state.
tmState *tmState

Expand Down Expand Up @@ -628,9 +633,12 @@ func (tm *TabletManager) handleRestore(ctx context.Context) (bool, error) {
return false, err
}
}
err := mysqlctl.PopulateMetadataTables(tm.MysqlDaemon, localMetadata, topoproto.TabletDbName(tablet))
if err != nil {
return false, vterrors.Wrap(err, "failed to -init_populate_metadata")

if tm.MetadataManager != nil {
err := tm.MetadataManager.PopulateMetadataTables(tm.MysqlDaemon, localMetadata, topoproto.TabletDbName(tablet))
if err != nil {
return false, vterrors.Wrap(err, "failed to -init_populate_metadata")
}
}
}
return false, nil
Expand Down
35 changes: 32 additions & 3 deletions go/vt/vttablet/tabletmanager/tm_init_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/mysql/fakesqldb"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/sync2"
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/logutil"
Expand Down Expand Up @@ -333,7 +335,7 @@ func TestStartCheckMysql(t *testing.T) {
tm := &TabletManager{
BatchCtx: context.Background(),
TopoServer: ts,
MysqlDaemon: &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(-1)},
MysqlDaemon: newTestMysqlDaemon(t, 1),
DBConfigs: dbconfigs.NewTestDBConfigs(cp, cp, ""),
QueryServiceControl: tabletservermock.NewController(),
}
Expand All @@ -355,7 +357,7 @@ func TestStartFindMysqlPort(t *testing.T) {
cell := "cell1"
ts := memorytopo.NewServer(cell)
tablet := newTestTablet(t, 1, "ks", "0")
fmd := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(-1)}
fmd := newTestMysqlDaemon(t, -1)
tm := &TabletManager{
BatchCtx: context.Background(),
TopoServer: ts,
Expand Down Expand Up @@ -502,14 +504,41 @@ func TestCheckTabletTypeResets(t *testing.T) {
tm.Stop()
}

func newTestMysqlDaemon(t *testing.T, port int32) *fakemysqldaemon.FakeMysqlDaemon {
t.Helper()

db := fakesqldb.New(t)
db.AddQueryPattern("SET @@.*", &sqltypes.Result{})
db.AddQueryPattern("BEGIN", &sqltypes.Result{})
db.AddQueryPattern("COMMIT", &sqltypes.Result{})

db.AddQueryPattern("CREATE DATABASE IF NOT EXISTS _vt", &sqltypes.Result{})
db.AddQueryPattern("CREATE TABLE IF NOT EXISTS _vt\\.(local|shard)_metadata.*", &sqltypes.Result{})

db.AddQueryPattern("ALTER TABLE _vt\\.local_metadata ADD COLUMN (db_name).*", &sqltypes.Result{})
db.AddQueryPattern("ALTER TABLE _vt\\.local_metadata DROP PRIMARY KEY, ADD PRIMARY KEY\\(name, db_name\\)", &sqltypes.Result{})
db.AddQueryPattern("ALTER TABLE _vt\\.local_metadata CHANGE value.*", &sqltypes.Result{})

db.AddQueryPattern("ALTER TABLE _vt\\.shard_metadata ADD COLUMN (db_name).*", &sqltypes.Result{})
db.AddQueryPattern("ALTER TABLE _vt\\.shard_metadata DROP PRIMARY KEY, ADD PRIMARY KEY\\(name, db_name\\)", &sqltypes.Result{})

db.AddQueryPattern("UPDATE _vt\\.(local|shard)_metadata SET db_name='.+' WHERE db_name=''", &sqltypes.Result{})
db.AddQueryPattern("INSERT INTO _vt\\.local_metadata \\(.+\\) VALUES \\(.+\\) ON DUPLICATE KEY UPDATE value ?= ?'.+'.*", &sqltypes.Result{})

mysqld := fakemysqldaemon.NewFakeMysqlDaemon(db)
mysqld.MysqlPort = sync2.NewAtomicInt32(port)

return mysqld
}

func newTestTM(t *testing.T, ts *topo.Server, uid int, keyspace, shard string) *TabletManager {
t.Helper()
ctx := context.Background()
tablet := newTestTablet(t, uid, keyspace, shard)
tm := &TabletManager{
BatchCtx: ctx,
TopoServer: ts,
MysqlDaemon: &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(1)},
MysqlDaemon: newTestMysqlDaemon(t, 1),
DBConfigs: &dbconfigs.DBConfigs{},
QueryServiceControl: tabletservermock.NewController(),
}
Expand Down
Loading