diff --git a/go/cmd/vttablet/vttablet.go b/go/cmd/vttablet/vttablet.go index d8777ce84f5..1d6838c3e9b 100644 --- a/go/cmd/vttablet/vttablet.go +++ b/go/cmd/vttablet/vttablet.go @@ -104,6 +104,7 @@ func main() { QueryServiceControl: qsc, UpdateStream: binlog.NewUpdateStream(ts, tablet.Keyspace, tabletAlias.Cell, qsc.SchemaEngine()), VREngine: vreplication.NewEngine(config, ts, tabletAlias.Cell, mysqld, qsc.LagThrottler()), + MetadataManager: &mysqlctl.MetadataManager{}, } if err := tm.Start(tablet, config.Healthcheck.IntervalSeconds.Get()); err != nil { log.Exitf("failed to parse -tablet-path or initialize DB credentials: %v", err) diff --git a/go/vt/mysqlctl/metadata_tables.go b/go/vt/mysqlctl/metadata_tables.go index 72deaac7f1b..6b9f5413911 100644 --- a/go/vt/mysqlctl/metadata_tables.go +++ b/go/vt/mysqlctl/metadata_tables.go @@ -24,6 +24,7 @@ import ( "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/dbconnpool" "vitess.io/vitess/go/vt/log" ) @@ -58,20 +59,44 @@ var ( } ) -// PopulateMetadataTables creates and fills the _vt.local_metadata table and -// creates _vt.shard_metadata table. _vt.local_metadata table is -// a per-tablet table that is never replicated. This allows queries -// against local_metadata to return different values on different tablets, -// which is used for communicating between Vitess and MySQL-level tools like -// Orchestrator (https://github.com/openark/orchestrator). +// MetadataManager manages the creation and filling of the _vt.local_metadata +// and _vt.shard_metadata tables. +type MetadataManager struct{} + +// CreateMetadataTables creates the metadata tables. See the package-level +// function for more details. +func (m *MetadataManager) CreateMetadataTables(mysqld MysqlDaemon, dbName string) error { + return CreateMetadataTables(mysqld, dbName) +} + +// PopulateMetadataTables creates and fills the metadata tables. See the +// package-level function for more details. +func (m *MetadataManager) PopulateMetadataTables(mysqld MysqlDaemon, localMetadata map[string]string, dbName string) error { + return PopulateMetadataTables(mysqld, localMetadata, dbName) +} + +// UpsertLocalMetadata adds the given metadata map to the _vt.local_metadata +// table, updating any duplicate rows to the values in the map. See the package- +// level function for more details. +func (m *MetadataManager) UpsertLocalMetadata(mysqld MysqlDaemon, localMetadata map[string]string, dbName string) error { + return UpsertLocalMetadata(mysqld, localMetadata, dbName) +} + +// CreateMetadataTables creates the _vt.local_metadata and _vt.shard_metadata +// tables. +// +// _vt.local_metadata table is a per-tablet table that is never replicated. +// This allows queries against local_metadata to return different values on +// different tablets, which is used for communicating between Vitess and +// MySQL-level tools like Orchestrator (https://github.com/openark/orchestrator). +// // _vt.shard_metadata is a replicated table with per-shard information, but it's // created here to make it easier to create it on databases that were running // old version of Vitess, or databases that are getting converted to run under // Vitess. -func PopulateMetadataTables(mysqld MysqlDaemon, localMetadata map[string]string, dbName string) error { - log.Infof("Populating _vt.local_metadata table...") +func CreateMetadataTables(mysqld MysqlDaemon, dbName string) error { + log.Infof("Creating _vt.local_metadata and _vt.shard_metadata tables ...") - // Get a non-pooled DBA connection. conn, err := mysqld.GetDbaConnection(context.TODO()) if err != nil { return err @@ -84,13 +109,30 @@ func PopulateMetadataTables(mysqld MysqlDaemon, localMetadata map[string]string, return err } - // Create the database and table if necessary. + return createMetadataTables(conn, dbName) +} + +func createMetadataTables(conn *dbconnpool.DBConnection, dbName string) error { if _, err := conn.ExecuteFetch("CREATE DATABASE IF NOT EXISTS _vt", 0, false); err != nil { return err } + + if err := createLocalMetadataTable(conn, dbName); err != nil { + return err + } + + if err := createShardMetadataTable(conn, dbName); err != nil { + return err + } + + return nil +} + +func createLocalMetadataTable(conn *dbconnpool.DBConnection, dbName string) error { if _, err := conn.ExecuteFetch(sqlCreateLocalMetadataTable, 0, false); err != nil { return err } + for _, sql := range sqlAlterLocalMetadataTable { if _, err := conn.ExecuteFetch(sql, 0, false); err != nil { // Ignore "Duplicate column name 'db_name'" errors which can happen on every restart. @@ -100,13 +142,20 @@ func PopulateMetadataTables(mysqld MysqlDaemon, localMetadata map[string]string, } } } + sql := fmt.Sprintf(sqlUpdateLocalMetadataTable, dbName) if _, err := conn.ExecuteFetch(sql, 0, false); err != nil { log.Errorf("Error executing %v: %v, continuing. Please check the data in _vt.local_metadata and take corrective action.", sql, err) } + + return nil +} + +func createShardMetadataTable(conn *dbconnpool.DBConnection, dbName string) error { if _, err := conn.ExecuteFetch(sqlCreateShardMetadataTable, 0, false); err != nil { return err } + for _, sql := range sqlAlterShardMetadataTable { if _, err := conn.ExecuteFetch(sql, 0, false); err != nil { // Ignore "Duplicate column name 'db_name'" errors which can happen on every restart. @@ -116,11 +165,73 @@ func PopulateMetadataTables(mysqld MysqlDaemon, localMetadata map[string]string, } } } - sql = fmt.Sprintf(sqlUpdateShardMetadataTable, dbName) + + sql := fmt.Sprintf(sqlUpdateShardMetadataTable, dbName) if _, err := conn.ExecuteFetch(sql, 0, false); err != nil { log.Errorf("Error executing %v: %v, continuing. Please check the data in _vt.shard_metadata and take corrective action.", sql, err) } + return nil +} + +// PopulateMetadataTables creates and fills the _vt.local_metadata table and +// creates _vt.shard_metadata table. +// +// This function is semantically equivalent to calling CreateMetadataTables +// followed immediately by UpsertLocalMetadata. +func PopulateMetadataTables(mysqld MysqlDaemon, localMetadata map[string]string, dbName string) error { + log.Infof("Populating _vt.local_metadata table...") + + // Get a non-pooled DBA connection. + conn, err := mysqld.GetDbaConnection(context.TODO()) + if err != nil { + return err + } + defer conn.Close() + + // Disable replication on this session. We close the connection after using + // it, so there's no need to re-enable replication when we're done. + if _, err := conn.ExecuteFetch("SET @@session.sql_log_bin = 0", 0, false); err != nil { + return err + } + + // Create the database and table if necessary. + if err := createMetadataTables(conn, dbName); err != nil { + return err + } + + // Populate local_metadata from the passed list of values. + return upsertLocalMetadata(conn, localMetadata, dbName) +} + +// UpsertLocalMetadata adds the given metadata map to the _vt.local_metadata +// table, updating any rows that exist for a given `_vt.local_metadata.name` +// with the map value. The session that performs these upserts sets +// sql_log_bin=0, as the _vt.local_metadata table is meant to never be +// replicated. +// +// Callers are responsible for ensuring the _vt.local_metadata table exists +// before calling this function, usually by calling CreateMetadataTables at +// least once prior. +func UpsertLocalMetadata(mysqld MysqlDaemon, localMetadata map[string]string, dbName string) error { + log.Infof("Upserting _vt.local_metadata ...") + + conn, err := mysqld.GetDbaConnection(context.TODO()) + if err != nil { + return err + } + defer conn.Close() + + // Disable replication on this session. We close the connection after using + // it, so there's no need to re-enable replication when we're done. + if _, err := conn.ExecuteFetch("SET @@session.sql_log_bin = 0", 0, false); err != nil { + return err + } + + return upsertLocalMetadata(conn, localMetadata, dbName) +} + +func upsertLocalMetadata(conn *dbconnpool.DBConnection, localMetadata map[string]string, dbName string) error { // Populate local_metadata from the passed list of values. if _, err := conn.ExecuteFetch("BEGIN", 0, false); err != nil { return err @@ -144,6 +255,10 @@ func PopulateMetadataTables(mysqld MysqlDaemon, localMetadata map[string]string, return err } } - _, err = conn.ExecuteFetch("COMMIT", 0, false) - return err + + if _, err := conn.ExecuteFetch("COMMIT", 0, false); err != nil { + return err + } + + return nil } diff --git a/go/vt/vttablet/tabletmanager/tm_init.go b/go/vt/vttablet/tabletmanager/tm_init.go index db38b091ea2..75d86c8b0e2 100644 --- a/go/vt/vttablet/tabletmanager/tm_init.go +++ b/go/vt/vttablet/tabletmanager/tm_init.go @@ -133,6 +133,11 @@ type TabletManager struct { UpdateStream binlog.UpdateStreamControl VREngine *vreplication.Engine + // MetadataManager manages the local metadata tables for a tablet. It + // exists, and is exported, to support swapping a nil pointer in test code, + // in which case metadata creation/population is skipped. + MetadataManager *mysqlctl.MetadataManager + // tmState manages the TabletManager state. tmState *tmState @@ -660,9 +665,12 @@ func (tm *TabletManager) handleRestore(ctx context.Context) (bool, error) { return false, err } } - err := mysqlctl.PopulateMetadataTables(tm.MysqlDaemon, localMetadata, topoproto.TabletDbName(tablet)) - if err != nil { - return false, vterrors.Wrap(err, "failed to -init_populate_metadata") + + if tm.MetadataManager != nil { + err := tm.MetadataManager.PopulateMetadataTables(tm.MysqlDaemon, localMetadata, topoproto.TabletDbName(tablet)) + if err != nil { + return false, vterrors.Wrap(err, "failed to -init_populate_metadata") + } } } return false, nil diff --git a/go/vt/vttablet/tabletmanager/tm_init_test.go b/go/vt/vttablet/tabletmanager/tm_init_test.go index 0875a5f743d..20c2e8b12e4 100644 --- a/go/vt/vttablet/tabletmanager/tm_init_test.go +++ b/go/vt/vttablet/tabletmanager/tm_init_test.go @@ -26,6 +26,8 @@ import ( "github.com/stretchr/testify/require" "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/mysql/fakesqldb" + "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/sync2" "vitess.io/vitess/go/vt/dbconfigs" "vitess.io/vitess/go/vt/logutil" @@ -335,7 +337,7 @@ func TestStartCheckMysql(t *testing.T) { tm := &TabletManager{ BatchCtx: context.Background(), TopoServer: ts, - MysqlDaemon: &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(-1)}, + MysqlDaemon: newTestMysqlDaemon(t, 1), DBConfigs: dbconfigs.NewTestDBConfigs(cp, cp, ""), QueryServiceControl: tabletservermock.NewController(), } @@ -357,7 +359,7 @@ func TestStartFindMysqlPort(t *testing.T) { cell := "cell1" ts := memorytopo.NewServer(cell) tablet := newTestTablet(t, 1, "ks", "0") - fmd := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(-1)} + fmd := newTestMysqlDaemon(t, -1) tm := &TabletManager{ BatchCtx: context.Background(), TopoServer: ts, @@ -504,6 +506,33 @@ func TestCheckTabletTypeResets(t *testing.T) { tm.Stop() } +func newTestMysqlDaemon(t *testing.T, port int32) *fakemysqldaemon.FakeMysqlDaemon { + t.Helper() + + db := fakesqldb.New(t) + db.AddQueryPattern("SET @@.*", &sqltypes.Result{}) + db.AddQueryPattern("BEGIN", &sqltypes.Result{}) + db.AddQueryPattern("COMMIT", &sqltypes.Result{}) + + db.AddQueryPattern("CREATE DATABASE IF NOT EXISTS _vt", &sqltypes.Result{}) + db.AddQueryPattern("CREATE TABLE IF NOT EXISTS _vt\\.(local|shard)_metadata.*", &sqltypes.Result{}) + + db.AddQueryPattern("ALTER TABLE _vt\\.local_metadata ADD COLUMN (db_name).*", &sqltypes.Result{}) + db.AddQueryPattern("ALTER TABLE _vt\\.local_metadata DROP PRIMARY KEY, ADD PRIMARY KEY\\(name, db_name\\)", &sqltypes.Result{}) + db.AddQueryPattern("ALTER TABLE _vt\\.local_metadata CHANGE value.*", &sqltypes.Result{}) + + db.AddQueryPattern("ALTER TABLE _vt\\.shard_metadata ADD COLUMN (db_name).*", &sqltypes.Result{}) + db.AddQueryPattern("ALTER TABLE _vt\\.shard_metadata DROP PRIMARY KEY, ADD PRIMARY KEY\\(name, db_name\\)", &sqltypes.Result{}) + + db.AddQueryPattern("UPDATE _vt\\.(local|shard)_metadata SET db_name='.+' WHERE db_name=''", &sqltypes.Result{}) + db.AddQueryPattern("INSERT INTO _vt\\.local_metadata \\(.+\\) VALUES \\(.+\\) ON DUPLICATE KEY UPDATE value ?= ?'.+'.*", &sqltypes.Result{}) + + mysqld := fakemysqldaemon.NewFakeMysqlDaemon(db) + mysqld.MysqlPort = sync2.NewAtomicInt32(port) + + return mysqld +} + func newTestTM(t *testing.T, ts *topo.Server, uid int, keyspace, shard string) *TabletManager { t.Helper() ctx := context.Background() @@ -511,7 +540,7 @@ func newTestTM(t *testing.T, ts *topo.Server, uid int, keyspace, shard string) * tm := &TabletManager{ BatchCtx: ctx, TopoServer: ts, - MysqlDaemon: &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(1)}, + MysqlDaemon: newTestMysqlDaemon(t, 1), DBConfigs: &dbconfigs.DBConfigs{}, QueryServiceControl: tabletservermock.NewController(), } diff --git a/go/vt/vttablet/tabletmanager/tm_state.go b/go/vt/vttablet/tabletmanager/tm_state.go index 1eb27d99cda..41b864bb013 100644 --- a/go/vt/vttablet/tabletmanager/tm_state.go +++ b/go/vt/vttablet/tabletmanager/tm_state.go @@ -59,15 +59,17 @@ type tmState struct { // Because mu can be held for long, we publish the current state // of these variables into displayState, which can be accessed // more freely even while tmState is busy transitioning. - mu sync.Mutex - isOpen bool - isResharding bool - isInSrvKeyspace bool - isShardServing map[topodatapb.TabletType]bool - tabletControls map[topodatapb.TabletType]bool - blacklistedTables map[topodatapb.TabletType][]string - tablet *topodatapb.Tablet - isPublishing bool + mu sync.Mutex + isOpen bool + isOpening bool + isResharding bool + isInSrvKeyspace bool + isShardServing map[topodatapb.TabletType]bool + tabletControls map[topodatapb.TabletType]bool + blacklistedTables map[topodatapb.TabletType][]string + tablet *topodatapb.Tablet + isPublishing bool + hasCreatedMetadataTables bool // displayState contains the current snapshot of the internal state // and has its own mutex. @@ -95,7 +97,9 @@ func (ts *tmState) Open() { } ts.isOpen = true + ts.isOpening = true ts.updateLocked(ts.ctx) + ts.isOpening = false ts.publishStateLocked(ts.ctx) } @@ -235,6 +239,7 @@ func (ts *tmState) updateLocked(ctx context.Context) { // before other services are shutdown. reason := ts.canServe(ts.tablet.Type) if reason != "" { + ts.populateLocalMetadataLocked() log.Infof("Disabling query service: %v", reason) if err := ts.tm.QueryServiceControl.SetServingType(ts.tablet.Type, terTime, false, reason); err != nil { log.Errorf("SetServingType(serving=false) failed: %v", err) @@ -276,6 +281,35 @@ func (ts *tmState) updateLocked(ctx context.Context) { if err := ts.tm.QueryServiceControl.SetServingType(ts.tablet.Type, terTime, true, ""); err != nil { log.Errorf("Cannot start query service: %v", err) } + + ts.populateLocalMetadataLocked() + } +} + +func (ts *tmState) populateLocalMetadataLocked() { + if ts.tm.MetadataManager == nil { + return + } + + if ts.isOpening && !*initPopulateMetadata { + return + } + + localMetadata := ts.tm.getLocalMetadataValues(ts.tablet.Type) + dbName := topoproto.TabletDbName(ts.tablet) + + if !ts.hasCreatedMetadataTables { + if err := ts.tm.MetadataManager.PopulateMetadataTables(ts.tm.MysqlDaemon, localMetadata, dbName); err != nil { + log.Errorf("PopulateMetadataTables(%v) failed: %v", localMetadata, err) + return + } + + ts.hasCreatedMetadataTables = true + return + } + + if err := ts.tm.MetadataManager.UpsertLocalMetadata(ts.tm.MysqlDaemon, localMetadata, dbName); err != nil { + log.Errorf("UpsertMetadataTables(%v) failed: %v", localMetadata, err) } } diff --git a/go/vt/vttablet/tabletmanager/tm_state_test.go b/go/vt/vttablet/tabletmanager/tm_state_test.go index 80566394a14..1c837714faa 100644 --- a/go/vt/vttablet/tabletmanager/tm_state_test.go +++ b/go/vt/vttablet/tabletmanager/tm_state_test.go @@ -163,6 +163,7 @@ func TestStateIsShardServingisInSrvKeyspace(t *testing.T) { tm.tmState.mu.Lock() tm.tmState.tablet.Type = topodatapb.TabletType_MASTER + tm.tmState.updateLocked(ctx) tm.tmState.mu.Unlock() leftKeyRange, err := key.ParseShardingSpec("-80")