Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go/cmd/vttablet/vttablet.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func main() {
QueryServiceControl: qsc,
UpdateStream: binlog.NewUpdateStream(ts, tablet.Keyspace, tabletAlias.Cell, qsc.SchemaEngine()),
VREngine: vreplication.NewEngine(config, ts, tabletAlias.Cell, mysqld, qsc.LagThrottler()),
MetadataManager: &mysqlctl.MetadataManager{},
}
if err := tm.Start(tablet, config.Healthcheck.IntervalSeconds.Get()); err != nil {
log.Exitf("failed to parse -tablet-path or initialize DB credentials: %v", err)
Expand Down
141 changes: 128 additions & 13 deletions go/vt/mysqlctl/metadata_tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/dbconnpool"
"vitess.io/vitess/go/vt/log"
)

Expand Down Expand Up @@ -58,20 +59,44 @@ var (
}
)

// PopulateMetadataTables creates and fills the _vt.local_metadata table and
// creates _vt.shard_metadata table. _vt.local_metadata table is
// a per-tablet table that is never replicated. This allows queries
// against local_metadata to return different values on different tablets,
// which is used for communicating between Vitess and MySQL-level tools like
// Orchestrator (https://github.com/openark/orchestrator).
// MetadataManager manages the creation and filling of the _vt.local_metadata
// and _vt.shard_metadata tables.
type MetadataManager struct{}

// CreateMetadataTables creates the metadata tables. See the package-level
// function for more details.
func (m *MetadataManager) CreateMetadataTables(mysqld MysqlDaemon, dbName string) error {
return CreateMetadataTables(mysqld, dbName)
}

// PopulateMetadataTables creates and fills the metadata tables. See the
// package-level function for more details.
func (m *MetadataManager) PopulateMetadataTables(mysqld MysqlDaemon, localMetadata map[string]string, dbName string) error {
return PopulateMetadataTables(mysqld, localMetadata, dbName)
}

// UpsertLocalMetadata adds the given metadata map to the _vt.local_metadata
// table, updating any duplicate rows to the values in the map. See the package-
// level function for more details.
func (m *MetadataManager) UpsertLocalMetadata(mysqld MysqlDaemon, localMetadata map[string]string, dbName string) error {
return UpsertLocalMetadata(mysqld, localMetadata, dbName)
}

// CreateMetadataTables creates the _vt.local_metadata and _vt.shard_metadata
// tables.
//
// _vt.local_metadata table is a per-tablet table that is never replicated.
// This allows queries against local_metadata to return different values on
// different tablets, which is used for communicating between Vitess and
// MySQL-level tools like Orchestrator (https://github.com/openark/orchestrator).
//
// _vt.shard_metadata is a replicated table with per-shard information, but it's
// created here to make it easier to create it on databases that were running
// old version of Vitess, or databases that are getting converted to run under
// Vitess.
func PopulateMetadataTables(mysqld MysqlDaemon, localMetadata map[string]string, dbName string) error {
log.Infof("Populating _vt.local_metadata table...")
func CreateMetadataTables(mysqld MysqlDaemon, dbName string) error {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like this function is only called locally, rename it to createMetadataTables()?

log.Infof("Creating _vt.local_metadata and _vt.shard_metadata tables ...")

// Get a non-pooled DBA connection.
conn, err := mysqld.GetDbaConnection(context.TODO())
if err != nil {
return err
Expand All @@ -84,13 +109,30 @@ func PopulateMetadataTables(mysqld MysqlDaemon, localMetadata map[string]string,
return err
}

// Create the database and table if necessary.
return createMetadataTables(conn, dbName)
}

func createMetadataTables(conn *dbconnpool.DBConnection, dbName string) error {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... following up on my previous comment, we now have one method CreateMetadataTables, one function CreateMetadataTables, one function createMetadataTables. Let's try and make these less ambiguous by renaming the latter two?

Copy link
Copy Markdown
Contributor Author

@ajm188 ajm188 May 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, how about this:

  • MetadataManager has public methods: {Create,Populate}MetadataTables and UpsertMetadataTables
  • The main reason I did this "public function and also public method on struct" thing was to reduce the number of changes I needed to do (because otherwise I would have to update everyone that called mysqlctl.PopulateMetadataTables .... which now that i look is just two places in mysqlctl/backup.go so maybe I should just change them 🤔 )
  • The other reason for the public/private with the same name was to allow code reuse without having to get new dba connections for each stage, so in Populate, we first get a connection, and then call private create and private upsert, whereas if we called public Create and public Upsert then we would get a new connection for each of those.

Then, we do:

type MetadataManager struct {}

func (m *MetadataManager) PopulateMetadataTables(mysqld MysqldDaemon, dbName string) error {
    log.Infof("....")
    conn, err := mysqld.GetDbaConnection(...)
    if err != nil {
        return err
    }
    defer conn.Close()
    
    return m.populateMetadataTablesWithConn(conn, dbName)
 }

func PopulateMetadataTables(mysqld MysqldDaemon, localMetadata map[string]string, dbName string) error {
    m := &MetadataManager{}
    return m.PopulateMetadataTables(mysqld, localMetadata, dbName)
}

and then the private function does all the logic, and has a slightly cleaner naming (due to the "WithConn" suffix, or at least that's my goal!). And if we want, I can update those last few callsites and remove the package-level public function as well.

What do you think?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. We can do so iteratively in followup PRs.

if _, err := conn.ExecuteFetch("CREATE DATABASE IF NOT EXISTS _vt", 0, false); err != nil {
return err
}

if err := createLocalMetadataTable(conn, dbName); err != nil {
return err
}

if err := createShardMetadataTable(conn, dbName); err != nil {
return err
}

return nil
}

func createLocalMetadataTable(conn *dbconnpool.DBConnection, dbName string) error {
if _, err := conn.ExecuteFetch(sqlCreateLocalMetadataTable, 0, false); err != nil {
return err
}

for _, sql := range sqlAlterLocalMetadataTable {
if _, err := conn.ExecuteFetch(sql, 0, false); err != nil {
// Ignore "Duplicate column name 'db_name'" errors which can happen on every restart.
Expand All @@ -100,13 +142,20 @@ func PopulateMetadataTables(mysqld MysqlDaemon, localMetadata map[string]string,
}
}
}

sql := fmt.Sprintf(sqlUpdateLocalMetadataTable, dbName)
if _, err := conn.ExecuteFetch(sql, 0, false); err != nil {
log.Errorf("Error executing %v: %v, continuing. Please check the data in _vt.local_metadata and take corrective action.", sql, err)
}

return nil
}

func createShardMetadataTable(conn *dbconnpool.DBConnection, dbName string) error {
if _, err := conn.ExecuteFetch(sqlCreateShardMetadataTable, 0, false); err != nil {
return err
}

for _, sql := range sqlAlterShardMetadataTable {
if _, err := conn.ExecuteFetch(sql, 0, false); err != nil {
// Ignore "Duplicate column name 'db_name'" errors which can happen on every restart.
Expand All @@ -116,11 +165,73 @@ func PopulateMetadataTables(mysqld MysqlDaemon, localMetadata map[string]string,
}
}
}
sql = fmt.Sprintf(sqlUpdateShardMetadataTable, dbName)

sql := fmt.Sprintf(sqlUpdateShardMetadataTable, dbName)
if _, err := conn.ExecuteFetch(sql, 0, false); err != nil {
log.Errorf("Error executing %v: %v, continuing. Please check the data in _vt.shard_metadata and take corrective action.", sql, err)
}

return nil
}

// PopulateMetadataTables creates and fills the _vt.local_metadata table and
// creates _vt.shard_metadata table.
//
// This function is semantically equivalent to calling CreateMetadataTables
// followed immediately by UpsertLocalMetadata.
func PopulateMetadataTables(mysqld MysqlDaemon, localMetadata map[string]string, dbName string) error {
log.Infof("Populating _vt.local_metadata table...")

// Get a non-pooled DBA connection.
conn, err := mysqld.GetDbaConnection(context.TODO())
if err != nil {
return err
}
defer conn.Close()

// Disable replication on this session. We close the connection after using
// it, so there's no need to re-enable replication when we're done.
if _, err := conn.ExecuteFetch("SET @@session.sql_log_bin = 0", 0, false); err != nil {
return err
}

// Create the database and table if necessary.
if err := createMetadataTables(conn, dbName); err != nil {
return err
}

// Populate local_metadata from the passed list of values.
return upsertLocalMetadata(conn, localMetadata, dbName)
}

// UpsertLocalMetadata adds the given metadata map to the _vt.local_metadata
// table, updating any rows that exist for a given `_vt.local_metadata.name`
// with the map value. The session that performs these upserts sets
// sql_log_bin=0, as the _vt.local_metadata table is meant to never be
// replicated.
//
// Callers are responsible for ensuring the _vt.local_metadata table exists
// before calling this function, usually by calling CreateMetadataTables at
// least once prior.
func UpsertLocalMetadata(mysqld MysqlDaemon, localMetadata map[string]string, dbName string) error {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function is only called locally, reduce its visibility by lowercasing it?

log.Infof("Upserting _vt.local_metadata ...")

conn, err := mysqld.GetDbaConnection(context.TODO())
if err != nil {
return err
}
defer conn.Close()

// Disable replication on this session. We close the connection after using
// it, so there's no need to re-enable replication when we're done.
if _, err := conn.ExecuteFetch("SET @@session.sql_log_bin = 0", 0, false); err != nil {
return err
}

return upsertLocalMetadata(conn, localMetadata, dbName)
}

func upsertLocalMetadata(conn *dbconnpool.DBConnection, localMetadata map[string]string, dbName string) error {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similarly, we have three different functions now called upsertLocalMetadata with different capitalization. Let's change names to avoid confusion.

// Populate local_metadata from the passed list of values.
if _, err := conn.ExecuteFetch("BEGIN", 0, false); err != nil {
return err
Expand All @@ -144,6 +255,10 @@ func PopulateMetadataTables(mysqld MysqlDaemon, localMetadata map[string]string,
return err
}
}
_, err = conn.ExecuteFetch("COMMIT", 0, false)
return err

if _, err := conn.ExecuteFetch("COMMIT", 0, false); err != nil {
return err
}

return nil
}
14 changes: 11 additions & 3 deletions go/vt/vttablet/tabletmanager/tm_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@ type TabletManager struct {
UpdateStream binlog.UpdateStreamControl
VREngine *vreplication.Engine

// MetadataManager manages the local metadata tables for a tablet. It
// exists, and is exported, to support swapping a nil pointer in test code,
// in which case metadata creation/population is skipped.
MetadataManager *mysqlctl.MetadataManager

// tmState manages the TabletManager state.
tmState *tmState

Expand Down Expand Up @@ -660,9 +665,12 @@ func (tm *TabletManager) handleRestore(ctx context.Context) (bool, error) {
return false, err
}
}
err := mysqlctl.PopulateMetadataTables(tm.MysqlDaemon, localMetadata, topoproto.TabletDbName(tablet))
if err != nil {
return false, vterrors.Wrap(err, "failed to -init_populate_metadata")

if tm.MetadataManager != nil {
err := tm.MetadataManager.PopulateMetadataTables(tm.MysqlDaemon, localMetadata, topoproto.TabletDbName(tablet))
if err != nil {
return false, vterrors.Wrap(err, "failed to -init_populate_metadata")
}
}
}
return false, nil
Expand Down
35 changes: 32 additions & 3 deletions go/vt/vttablet/tabletmanager/tm_init_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/mysql/fakesqldb"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/sync2"
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/logutil"
Expand Down Expand Up @@ -335,7 +337,7 @@ func TestStartCheckMysql(t *testing.T) {
tm := &TabletManager{
BatchCtx: context.Background(),
TopoServer: ts,
MysqlDaemon: &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(-1)},
MysqlDaemon: newTestMysqlDaemon(t, 1),
DBConfigs: dbconfigs.NewTestDBConfigs(cp, cp, ""),
QueryServiceControl: tabletservermock.NewController(),
}
Expand All @@ -357,7 +359,7 @@ func TestStartFindMysqlPort(t *testing.T) {
cell := "cell1"
ts := memorytopo.NewServer(cell)
tablet := newTestTablet(t, 1, "ks", "0")
fmd := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(-1)}
fmd := newTestMysqlDaemon(t, -1)
tm := &TabletManager{
BatchCtx: context.Background(),
TopoServer: ts,
Expand Down Expand Up @@ -504,14 +506,41 @@ func TestCheckTabletTypeResets(t *testing.T) {
tm.Stop()
}

func newTestMysqlDaemon(t *testing.T, port int32) *fakemysqldaemon.FakeMysqlDaemon {
t.Helper()

db := fakesqldb.New(t)
db.AddQueryPattern("SET @@.*", &sqltypes.Result{})
db.AddQueryPattern("BEGIN", &sqltypes.Result{})
db.AddQueryPattern("COMMIT", &sqltypes.Result{})

db.AddQueryPattern("CREATE DATABASE IF NOT EXISTS _vt", &sqltypes.Result{})
db.AddQueryPattern("CREATE TABLE IF NOT EXISTS _vt\\.(local|shard)_metadata.*", &sqltypes.Result{})

db.AddQueryPattern("ALTER TABLE _vt\\.local_metadata ADD COLUMN (db_name).*", &sqltypes.Result{})
db.AddQueryPattern("ALTER TABLE _vt\\.local_metadata DROP PRIMARY KEY, ADD PRIMARY KEY\\(name, db_name\\)", &sqltypes.Result{})
db.AddQueryPattern("ALTER TABLE _vt\\.local_metadata CHANGE value.*", &sqltypes.Result{})

db.AddQueryPattern("ALTER TABLE _vt\\.shard_metadata ADD COLUMN (db_name).*", &sqltypes.Result{})
db.AddQueryPattern("ALTER TABLE _vt\\.shard_metadata DROP PRIMARY KEY, ADD PRIMARY KEY\\(name, db_name\\)", &sqltypes.Result{})

db.AddQueryPattern("UPDATE _vt\\.(local|shard)_metadata SET db_name='.+' WHERE db_name=''", &sqltypes.Result{})
db.AddQueryPattern("INSERT INTO _vt\\.local_metadata \\(.+\\) VALUES \\(.+\\) ON DUPLICATE KEY UPDATE value ?= ?'.+'.*", &sqltypes.Result{})

mysqld := fakemysqldaemon.NewFakeMysqlDaemon(db)
mysqld.MysqlPort = sync2.NewAtomicInt32(port)

return mysqld
}

func newTestTM(t *testing.T, ts *topo.Server, uid int, keyspace, shard string) *TabletManager {
t.Helper()
ctx := context.Background()
tablet := newTestTablet(t, uid, keyspace, shard)
tm := &TabletManager{
BatchCtx: ctx,
TopoServer: ts,
MysqlDaemon: &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(1)},
MysqlDaemon: newTestMysqlDaemon(t, 1),
DBConfigs: &dbconfigs.DBConfigs{},
QueryServiceControl: tabletservermock.NewController(),
}
Expand Down
52 changes: 43 additions & 9 deletions go/vt/vttablet/tabletmanager/tm_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,17 @@ type tmState struct {
// Because mu can be held for long, we publish the current state
// of these variables into displayState, which can be accessed
// more freely even while tmState is busy transitioning.
mu sync.Mutex
isOpen bool
isResharding bool
isInSrvKeyspace bool
isShardServing map[topodatapb.TabletType]bool
tabletControls map[topodatapb.TabletType]bool
blacklistedTables map[topodatapb.TabletType][]string
tablet *topodatapb.Tablet
isPublishing bool
mu sync.Mutex
isOpen bool
isOpening bool
isResharding bool
isInSrvKeyspace bool
isShardServing map[topodatapb.TabletType]bool
tabletControls map[topodatapb.TabletType]bool
blacklistedTables map[topodatapb.TabletType][]string
tablet *topodatapb.Tablet
isPublishing bool
hasCreatedMetadataTables bool

// displayState contains the current snapshot of the internal state
// and has its own mutex.
Expand Down Expand Up @@ -95,7 +97,9 @@ func (ts *tmState) Open() {
}

ts.isOpen = true
ts.isOpening = true
ts.updateLocked(ts.ctx)
ts.isOpening = false
ts.publishStateLocked(ts.ctx)
}

Expand Down Expand Up @@ -235,6 +239,7 @@ func (ts *tmState) updateLocked(ctx context.Context) {
// before other services are shutdown.
reason := ts.canServe(ts.tablet.Type)
if reason != "" {
ts.populateLocalMetadataLocked()
log.Infof("Disabling query service: %v", reason)
if err := ts.tm.QueryServiceControl.SetServingType(ts.tablet.Type, terTime, false, reason); err != nil {
log.Errorf("SetServingType(serving=false) failed: %v", err)
Expand Down Expand Up @@ -276,6 +281,35 @@ func (ts *tmState) updateLocked(ctx context.Context) {
if err := ts.tm.QueryServiceControl.SetServingType(ts.tablet.Type, terTime, true, ""); err != nil {
log.Errorf("Cannot start query service: %v", err)
}

ts.populateLocalMetadataLocked()
}
}

func (ts *tmState) populateLocalMetadataLocked() {
if ts.tm.MetadataManager == nil {
return
}

if ts.isOpening && !*initPopulateMetadata {
return
}

localMetadata := ts.tm.getLocalMetadataValues(ts.tablet.Type)
dbName := topoproto.TabletDbName(ts.tablet)

if !ts.hasCreatedMetadataTables {
if err := ts.tm.MetadataManager.PopulateMetadataTables(ts.tm.MysqlDaemon, localMetadata, dbName); err != nil {
log.Errorf("PopulateMetadataTables(%v) failed: %v", localMetadata, err)
return
}

ts.hasCreatedMetadataTables = true
return
}

if err := ts.tm.MetadataManager.UpsertLocalMetadata(ts.tm.MysqlDaemon, localMetadata, dbName); err != nil {
log.Errorf("UpsertMetadataTables(%v) failed: %v", localMetadata, err)
}
}

Expand Down
1 change: 1 addition & 0 deletions go/vt/vttablet/tabletmanager/tm_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ func TestStateIsShardServingisInSrvKeyspace(t *testing.T) {

tm.tmState.mu.Lock()
tm.tmState.tablet.Type = topodatapb.TabletType_MASTER
tm.tmState.updateLocked(ctx)
tm.tmState.mu.Unlock()

leftKeyRange, err := key.ParseShardingSpec("-80")
Expand Down