Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions go/vt/mysqlctl/fakemysqldaemon/fakemysqldaemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ func (fmd *FakeMysqlDaemon) CheckSuperQueryList() error {
}

// GetSchema is part of the MysqlDaemon interface
func (fmd *FakeMysqlDaemon) GetSchema(dbName string, tables, excludeTables []string, includeViews bool) (*tabletmanagerdatapb.SchemaDefinition, error) {
func (fmd *FakeMysqlDaemon) GetSchema(ctx context.Context, dbName string, tables, excludeTables []string, includeViews bool) (*tabletmanagerdatapb.SchemaDefinition, error) {
if fmd.SchemaFunc != nil {
return fmd.SchemaFunc()
}
Expand All @@ -446,25 +446,25 @@ func (fmd *FakeMysqlDaemon) GetSchema(dbName string, tables, excludeTables []str
}

// GetColumns is part of the MysqlDaemon interface
func (fmd *FakeMysqlDaemon) GetColumns(dbName, table string) ([]*querypb.Field, []string, error) {
func (fmd *FakeMysqlDaemon) GetColumns(ctx context.Context, dbName, table string) ([]*querypb.Field, []string, error) {
return []*querypb.Field{}, []string{}, nil
}

// GetPrimaryKeyColumns is part of the MysqlDaemon interface
func (fmd *FakeMysqlDaemon) GetPrimaryKeyColumns(dbName, table string) ([]string, error) {
func (fmd *FakeMysqlDaemon) GetPrimaryKeyColumns(ctx context.Context, dbName, table string) ([]string, error) {
return []string{}, nil
}

// PreflightSchemaChange is part of the MysqlDaemon interface
func (fmd *FakeMysqlDaemon) PreflightSchemaChange(dbName string, changes []string) ([]*tabletmanagerdatapb.SchemaChangeResult, error) {
func (fmd *FakeMysqlDaemon) PreflightSchemaChange(ctx context.Context, dbName string, changes []string) ([]*tabletmanagerdatapb.SchemaChangeResult, error) {
if fmd.PreflightSchemaChangeResult == nil {
return nil, fmt.Errorf("no preflight result defined")
}
return fmd.PreflightSchemaChangeResult, nil
}

// ApplySchemaChange is part of the MysqlDaemon interface
func (fmd *FakeMysqlDaemon) ApplySchemaChange(dbName string, change *tmutils.SchemaChange) (*tabletmanagerdatapb.SchemaChangeResult, error) {
func (fmd *FakeMysqlDaemon) ApplySchemaChange(ctx context.Context, dbName string, change *tmutils.SchemaChange) (*tabletmanagerdatapb.SchemaChangeResult, error) {
if fmd.ApplySchemaChangeResult == nil {
return nil, fmt.Errorf("no apply schema defined")
}
Expand Down
11 changes: 6 additions & 5 deletions go/vt/mysqlctl/mysql_daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,13 @@ type MysqlDaemon interface {
// Promote makes the current server master. It will not change
// the read_only state of the server.
Promote(map[string]string) (mysql.Position, error)

// Schema related methods
GetSchema(dbName string, tables, excludeTables []string, includeViews bool) (*tabletmanagerdatapb.SchemaDefinition, error)
GetColumns(dbName, table string) ([]*querypb.Field, []string, error)
GetPrimaryKeyColumns(dbName, table string) ([]string, error)
PreflightSchemaChange(dbName string, changes []string) ([]*tabletmanagerdatapb.SchemaChangeResult, error)
ApplySchemaChange(dbName string, change *tmutils.SchemaChange) (*tabletmanagerdatapb.SchemaChangeResult, error)
GetSchema(ctx context.Context, dbName string, tables, excludeTables []string, includeViews bool) (*tabletmanagerdatapb.SchemaDefinition, error)
GetColumns(ctx context.Context, dbName, table string) ([]*querypb.Field, []string, error)
GetPrimaryKeyColumns(ctx context.Context, dbName, table string) ([]string, error)
PreflightSchemaChange(ctx context.Context, dbName string, changes []string) ([]*tabletmanagerdatapb.SchemaChangeResult, error)
ApplySchemaChange(ctx context.Context, dbName string, change *tmutils.SchemaChange) (*tabletmanagerdatapb.SchemaChangeResult, error)

// GetAppConnection returns a app connection to be able to talk to the database.
GetAppConnection(ctx context.Context) (*dbconnpool.PooledDBConnection, error)
Expand Down
41 changes: 22 additions & 19 deletions go/vt/mysqlctl/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ func (mysqld *Mysqld) executeSchemaCommands(sql string) error {

// GetSchema returns the schema for database for tables listed in
// tables. If tables is empty, return the schema for all tables.
func (mysqld *Mysqld) GetSchema(dbName string, tables, excludeTables []string, includeViews bool) (*tabletmanagerdatapb.SchemaDefinition, error) {
ctx := context.TODO()
func (mysqld *Mysqld) GetSchema(ctx context.Context, dbName string, tables, excludeTables []string, includeViews bool) (*tabletmanagerdatapb.SchemaDefinition, error) {
sd := &tabletmanagerdatapb.SchemaDefinition{}
backtickDBName := sqlescape.EscapeID(dbName)

Expand Down Expand Up @@ -123,11 +122,11 @@ func (mysqld *Mysqld) GetSchema(dbName string, tables, excludeTables []string, i
td.Name = tableName
td.Schema = norm

td.Fields, td.Columns, err = mysqld.GetColumns(dbName, tableName)
td.Fields, td.Columns, err = mysqld.GetColumns(ctx, dbName, tableName)
if err != nil {
return nil, err
}
td.PrimaryKeyColumns, err = mysqld.GetPrimaryKeyColumns(dbName, tableName)
td.PrimaryKeyColumns, err = mysqld.GetPrimaryKeyColumns(ctx, dbName, tableName)
if err != nil {
return nil, err
}
Expand All @@ -147,8 +146,8 @@ func (mysqld *Mysqld) GetSchema(dbName string, tables, excludeTables []string, i

// ResolveTables returns a list of actual tables+views matching a list
// of regexps
func ResolveTables(mysqld MysqlDaemon, dbName string, tables []string) ([]string, error) {
sd, err := mysqld.GetSchema(dbName, tables, nil, true)
func ResolveTables(ctx context.Context, mysqld MysqlDaemon, dbName string, tables []string) ([]string, error) {
sd, err := mysqld.GetSchema(ctx, dbName, tables, nil, true)
if err != nil {
return nil, err
}
Expand All @@ -160,13 +159,15 @@ func ResolveTables(mysqld MysqlDaemon, dbName string, tables []string) ([]string
}

// GetColumns returns the columns of table.
func (mysqld *Mysqld) GetColumns(dbName, table string) ([]*querypb.Field, []string, error) {
conn, err := getPoolReconnect(context.TODO(), mysqld.dbaPool)
func (mysqld *Mysqld) GetColumns(ctx context.Context, dbName, table string) ([]*querypb.Field, []string, error) {
conn, err := getPoolReconnect(ctx, mysqld.dbaPool)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this only applies the Context to fetching the connection. If you want the actual query to respect Context, you can replace conn.ExecuteFetch() everywhere with mysqld.executeFetchContext().

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm...who should be doing that exactly?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any method of Mysqld that wants to respect Context should use mysqld.executeFetchContext() as a wrapper around conn.ExecuteFetch() instead of calling it directly.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

if err != nil {
return nil, nil, err
}
defer conn.Recycle()
qr, err := conn.ExecuteFetch(fmt.Sprintf("SELECT * FROM %s.%s WHERE 1=0", sqlescape.EscapeID(dbName), sqlescape.EscapeID(table)), 0, true)

sql := fmt.Sprintf("SELECT * FROM %s.%s WHERE 1=0", sqlescape.EscapeID(dbName), sqlescape.EscapeID(table))
qr, err := mysqld.executeFetchContext(ctx, conn, sql, 0, true)
if err != nil {
return nil, nil, err
}
Expand All @@ -179,13 +180,15 @@ func (mysqld *Mysqld) GetColumns(dbName, table string) ([]*querypb.Field, []stri
}

// GetPrimaryKeyColumns returns the primary key columns of table.
func (mysqld *Mysqld) GetPrimaryKeyColumns(dbName, table string) ([]string, error) {
conn, err := getPoolReconnect(context.TODO(), mysqld.dbaPool)
func (mysqld *Mysqld) GetPrimaryKeyColumns(ctx context.Context, dbName, table string) ([]string, error) {
conn, err := getPoolReconnect(ctx, mysqld.dbaPool)
if err != nil {
return nil, err
}
defer conn.Recycle()
qr, err := conn.ExecuteFetch(fmt.Sprintf("SHOW INDEX FROM %s.%s", sqlescape.EscapeID(dbName), sqlescape.EscapeID(table)), 100, true)

sql := fmt.Sprintf("SHOW INDEX FROM %s.%s", sqlescape.EscapeID(dbName), sqlescape.EscapeID(table))
qr, err := mysqld.executeFetchContext(ctx, conn, sql, 100, true)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -231,11 +234,11 @@ func (mysqld *Mysqld) GetPrimaryKeyColumns(dbName, table string) ([]string, erro

// PreflightSchemaChange checks the schema changes in "changes" by applying them
// to an intermediate database that has the same schema as the target database.
func (mysqld *Mysqld) PreflightSchemaChange(dbName string, changes []string) ([]*tabletmanagerdatapb.SchemaChangeResult, error) {
func (mysqld *Mysqld) PreflightSchemaChange(ctx context.Context, dbName string, changes []string) ([]*tabletmanagerdatapb.SchemaChangeResult, error) {
results := make([]*tabletmanagerdatapb.SchemaChangeResult, len(changes))

// Get current schema from the real database.
originalSchema, err := mysqld.GetSchema(dbName, nil, nil, true)
originalSchema, err := mysqld.GetSchema(ctx, dbName, nil, nil, true)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -267,7 +270,7 @@ func (mysqld *Mysqld) PreflightSchemaChange(dbName string, changes []string) ([]

// For each change, record the schema before and after.
for i, change := range changes {
beforeSchema, err := mysqld.GetSchema("_vt_preflight", nil, nil, true)
beforeSchema, err := mysqld.GetSchema(ctx, "_vt_preflight", nil, nil, true)
if err != nil {
return nil, err
}
Expand All @@ -281,7 +284,7 @@ func (mysqld *Mysqld) PreflightSchemaChange(dbName string, changes []string) ([]
}

// get the result
afterSchema, err := mysqld.GetSchema("_vt_preflight", nil, nil, true)
afterSchema, err := mysqld.GetSchema(ctx, "_vt_preflight", nil, nil, true)
if err != nil {
return nil, err
}
Expand All @@ -300,9 +303,9 @@ func (mysqld *Mysqld) PreflightSchemaChange(dbName string, changes []string) ([]
}

// ApplySchemaChange will apply the schema change to the given database.
func (mysqld *Mysqld) ApplySchemaChange(dbName string, change *tmutils.SchemaChange) (*tabletmanagerdatapb.SchemaChangeResult, error) {
func (mysqld *Mysqld) ApplySchemaChange(ctx context.Context, dbName string, change *tmutils.SchemaChange) (*tabletmanagerdatapb.SchemaChangeResult, error) {
// check current schema matches
beforeSchema, err := mysqld.GetSchema(dbName, nil, nil, true)
beforeSchema, err := mysqld.GetSchema(ctx, dbName, nil, nil, true)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -349,7 +352,7 @@ func (mysqld *Mysqld) ApplySchemaChange(dbName string, change *tmutils.SchemaCha
}

// get AfterSchema
afterSchema, err := mysqld.GetSchema(dbName, nil, nil, true)
afterSchema, err := mysqld.GetSchema(ctx, dbName, nil, nil, true)
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions go/vt/vttablet/tabletmanager/rpc_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (

// GetSchema returns the schema.
func (agent *ActionAgent) GetSchema(ctx context.Context, tables, excludeTables []string, includeViews bool) (*tabletmanagerdatapb.SchemaDefinition, error) {
return agent.MysqlDaemon.GetSchema(topoproto.TabletDbName(agent.Tablet()), tables, excludeTables, includeViews)
return agent.MysqlDaemon.GetSchema(ctx, topoproto.TabletDbName(agent.Tablet()), tables, excludeTables, includeViews)
}

// ReloadSchema will reload the schema
Expand Down Expand Up @@ -69,7 +69,7 @@ func (agent *ActionAgent) PreflightSchema(ctx context.Context, changes []string)
dbName := topoproto.TabletDbName(agent.Tablet())

// and preflight the change
return agent.MysqlDaemon.PreflightSchemaChange(dbName, changes)
return agent.MysqlDaemon.PreflightSchemaChange(ctx, dbName, changes)
}

// ApplySchema will apply a schema change
Expand All @@ -83,7 +83,7 @@ func (agent *ActionAgent) ApplySchema(ctx context.Context, change *tmutils.Schem
dbName := topoproto.TabletDbName(agent.Tablet())

// apply the change
scr, err := agent.MysqlDaemon.ApplySchemaChange(dbName, change)
scr, err := agent.MysqlDaemon.ApplySchemaChange(ctx, dbName, change)
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions go/vt/vttablet/tabletmanager/state_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ var (
const blacklistQueryRules string = "BlacklistQueryRules"

// loadBlacklistRules loads and builds the blacklist query rules
func (agent *ActionAgent) loadBlacklistRules(tablet *topodatapb.Tablet, blacklistedTables []string) (err error) {
func (agent *ActionAgent) loadBlacklistRules(ctx context.Context, tablet *topodatapb.Tablet, blacklistedTables []string) (err error) {
blacklistRules := rules.New()
if len(blacklistedTables) > 0 {
// tables, first resolve wildcards
tables, err := mysqlctl.ResolveTables(agent.MysqlDaemon, topoproto.TabletDbName(tablet), blacklistedTables)
tables, err := mysqlctl.ResolveTables(ctx, agent.MysqlDaemon, topoproto.TabletDbName(tablet), blacklistedTables)
if err != nil {
return err
}
Expand Down Expand Up @@ -284,7 +284,7 @@ func (agent *ActionAgent) changeCallback(ctx context.Context, oldTablet, newTabl
}
agent.setServicesDesiredState(disallowQueryService, runUpdateStream)
if updateBlacklistedTables {
if err := agent.loadBlacklistRules(newTablet, blacklistedTables); err != nil {
if err := agent.loadBlacklistRules(ctx, newTablet, blacklistedTables); err != nil {
// FIXME(alainjobart) how to handle this error?
log.Errorf("Cannot update blacklisted tables rule: %v", err)
} else {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/vreplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (ct *controller) runBlp(ctx context.Context) (err error) {
switch {
case len(ct.source.Tables) > 0:
// Table names can have search patterns. Resolve them against the schema.
tables, err := mysqlctl.ResolveTables(ct.mysqld, dbClient.DBName(), ct.source.Tables)
tables, err := mysqlctl.ResolveTables(ctx, ct.mysqld, dbClient.DBName(), ct.source.Tables)
if err != nil {
return vterrors.Wrap(err, "failed to resolve table names")
}
Expand Down
6 changes: 3 additions & 3 deletions go/vt/vttablet/tabletmanager/vreplication/vreplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (vr *vreplicator) Replicate(ctx context.Context) error {
}

func (vr *vreplicator) replicate(ctx context.Context) error {
tableKeys, err := vr.buildTableKeys()
tableKeys, err := vr.buildTableKeys(ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -171,8 +171,8 @@ func (vr *vreplicator) replicate(ctx context.Context) error {
}
}

func (vr *vreplicator) buildTableKeys() (map[string][]string, error) {
schema, err := vr.mysqld.GetSchema(vr.dbClient.DBName(), []string{"/.*/"}, nil, false)
func (vr *vreplicator) buildTableKeys(ctx context.Context) (map[string][]string, error) {
schema, err := vr.mysqld.GetSchema(ctx, vr.dbClient.DBName(), []string{"/.*/"}, nil, false)
if err != nil {
return nil, err
}
Expand Down