From 7e6910541ad78ddabe4f33f5cdc3cd0a13bf58a2 Mon Sep 17 00:00:00 2001 From: Toliver Jue Date: Fri, 29 May 2020 22:27:51 +0900 Subject: [PATCH] Add context to MysqlDaemon schema functions Signed-off-by: Toliver Jue --- .../fakemysqldaemon/fakemysqldaemon.go | 10 ++--- go/vt/mysqlctl/mysql_daemon.go | 11 ++--- go/vt/mysqlctl/schema.go | 41 ++++++++++--------- go/vt/vttablet/tabletmanager/rpc_schema.go | 6 +-- go/vt/vttablet/tabletmanager/state_change.go | 6 +-- .../tabletmanager/vreplication/controller.go | 2 +- .../tabletmanager/vreplication/vreplicator.go | 6 +-- 7 files changed, 43 insertions(+), 39 deletions(-) diff --git a/go/vt/mysqlctl/fakemysqldaemon/fakemysqldaemon.go b/go/vt/mysqlctl/fakemysqldaemon/fakemysqldaemon.go index cdf443bbdb5..b5b302b1b3c 100644 --- a/go/vt/mysqlctl/fakemysqldaemon/fakemysqldaemon.go +++ b/go/vt/mysqlctl/fakemysqldaemon/fakemysqldaemon.go @@ -435,7 +435,7 @@ func (fmd *FakeMysqlDaemon) CheckSuperQueryList() error { } // GetSchema is part of the MysqlDaemon interface -func (fmd *FakeMysqlDaemon) GetSchema(dbName string, tables, excludeTables []string, includeViews bool) (*tabletmanagerdatapb.SchemaDefinition, error) { +func (fmd *FakeMysqlDaemon) GetSchema(ctx context.Context, dbName string, tables, excludeTables []string, includeViews bool) (*tabletmanagerdatapb.SchemaDefinition, error) { if fmd.SchemaFunc != nil { return fmd.SchemaFunc() } @@ -446,17 +446,17 @@ func (fmd *FakeMysqlDaemon) GetSchema(dbName string, tables, excludeTables []str } // GetColumns is part of the MysqlDaemon interface -func (fmd *FakeMysqlDaemon) GetColumns(dbName, table string) ([]*querypb.Field, []string, error) { +func (fmd *FakeMysqlDaemon) GetColumns(ctx context.Context, dbName, table string) ([]*querypb.Field, []string, error) { return []*querypb.Field{}, []string{}, nil } // GetPrimaryKeyColumns is part of the MysqlDaemon interface -func (fmd *FakeMysqlDaemon) GetPrimaryKeyColumns(dbName, table string) ([]string, error) { +func (fmd *FakeMysqlDaemon) GetPrimaryKeyColumns(ctx context.Context, dbName, table string) ([]string, error) { return []string{}, nil } // PreflightSchemaChange is part of the MysqlDaemon interface -func (fmd *FakeMysqlDaemon) PreflightSchemaChange(dbName string, changes []string) ([]*tabletmanagerdatapb.SchemaChangeResult, error) { +func (fmd *FakeMysqlDaemon) PreflightSchemaChange(ctx context.Context, dbName string, changes []string) ([]*tabletmanagerdatapb.SchemaChangeResult, error) { if fmd.PreflightSchemaChangeResult == nil { return nil, fmt.Errorf("no preflight result defined") } @@ -464,7 +464,7 @@ func (fmd *FakeMysqlDaemon) PreflightSchemaChange(dbName string, changes []strin } // ApplySchemaChange is part of the MysqlDaemon interface -func (fmd *FakeMysqlDaemon) ApplySchemaChange(dbName string, change *tmutils.SchemaChange) (*tabletmanagerdatapb.SchemaChangeResult, error) { +func (fmd *FakeMysqlDaemon) ApplySchemaChange(ctx context.Context, dbName string, change *tmutils.SchemaChange) (*tabletmanagerdatapb.SchemaChangeResult, error) { if fmd.ApplySchemaChangeResult == nil { return nil, fmt.Errorf("no apply schema defined") } diff --git a/go/vt/mysqlctl/mysql_daemon.go b/go/vt/mysqlctl/mysql_daemon.go index 94e439b8c0e..45c8afab16d 100644 --- a/go/vt/mysqlctl/mysql_daemon.go +++ b/go/vt/mysqlctl/mysql_daemon.go @@ -68,12 +68,13 @@ type MysqlDaemon interface { // Promote makes the current server master. It will not change // the read_only state of the server. Promote(map[string]string) (mysql.Position, error) + // Schema related methods - GetSchema(dbName string, tables, excludeTables []string, includeViews bool) (*tabletmanagerdatapb.SchemaDefinition, error) - GetColumns(dbName, table string) ([]*querypb.Field, []string, error) - GetPrimaryKeyColumns(dbName, table string) ([]string, error) - PreflightSchemaChange(dbName string, changes []string) ([]*tabletmanagerdatapb.SchemaChangeResult, error) - ApplySchemaChange(dbName string, change *tmutils.SchemaChange) (*tabletmanagerdatapb.SchemaChangeResult, error) + GetSchema(ctx context.Context, dbName string, tables, excludeTables []string, includeViews bool) (*tabletmanagerdatapb.SchemaDefinition, error) + GetColumns(ctx context.Context, dbName, table string) ([]*querypb.Field, []string, error) + GetPrimaryKeyColumns(ctx context.Context, dbName, table string) ([]string, error) + PreflightSchemaChange(ctx context.Context, dbName string, changes []string) ([]*tabletmanagerdatapb.SchemaChangeResult, error) + ApplySchemaChange(ctx context.Context, dbName string, change *tmutils.SchemaChange) (*tabletmanagerdatapb.SchemaChangeResult, error) // GetAppConnection returns a app connection to be able to talk to the database. GetAppConnection(ctx context.Context) (*dbconnpool.PooledDBConnection, error) diff --git a/go/vt/mysqlctl/schema.go b/go/vt/mysqlctl/schema.go index fbb63227658..7d57c5488cb 100644 --- a/go/vt/mysqlctl/schema.go +++ b/go/vt/mysqlctl/schema.go @@ -48,8 +48,7 @@ func (mysqld *Mysqld) executeSchemaCommands(sql string) error { // GetSchema returns the schema for database for tables listed in // tables. If tables is empty, return the schema for all tables. -func (mysqld *Mysqld) GetSchema(dbName string, tables, excludeTables []string, includeViews bool) (*tabletmanagerdatapb.SchemaDefinition, error) { - ctx := context.TODO() +func (mysqld *Mysqld) GetSchema(ctx context.Context, dbName string, tables, excludeTables []string, includeViews bool) (*tabletmanagerdatapb.SchemaDefinition, error) { sd := &tabletmanagerdatapb.SchemaDefinition{} backtickDBName := sqlescape.EscapeID(dbName) @@ -123,11 +122,11 @@ func (mysqld *Mysqld) GetSchema(dbName string, tables, excludeTables []string, i td.Name = tableName td.Schema = norm - td.Fields, td.Columns, err = mysqld.GetColumns(dbName, tableName) + td.Fields, td.Columns, err = mysqld.GetColumns(ctx, dbName, tableName) if err != nil { return nil, err } - td.PrimaryKeyColumns, err = mysqld.GetPrimaryKeyColumns(dbName, tableName) + td.PrimaryKeyColumns, err = mysqld.GetPrimaryKeyColumns(ctx, dbName, tableName) if err != nil { return nil, err } @@ -147,8 +146,8 @@ func (mysqld *Mysqld) GetSchema(dbName string, tables, excludeTables []string, i // ResolveTables returns a list of actual tables+views matching a list // of regexps -func ResolveTables(mysqld MysqlDaemon, dbName string, tables []string) ([]string, error) { - sd, err := mysqld.GetSchema(dbName, tables, nil, true) +func ResolveTables(ctx context.Context, mysqld MysqlDaemon, dbName string, tables []string) ([]string, error) { + sd, err := mysqld.GetSchema(ctx, dbName, tables, nil, true) if err != nil { return nil, err } @@ -160,13 +159,15 @@ func ResolveTables(mysqld MysqlDaemon, dbName string, tables []string) ([]string } // GetColumns returns the columns of table. -func (mysqld *Mysqld) GetColumns(dbName, table string) ([]*querypb.Field, []string, error) { - conn, err := getPoolReconnect(context.TODO(), mysqld.dbaPool) +func (mysqld *Mysqld) GetColumns(ctx context.Context, dbName, table string) ([]*querypb.Field, []string, error) { + conn, err := getPoolReconnect(ctx, mysqld.dbaPool) if err != nil { return nil, nil, err } defer conn.Recycle() - qr, err := conn.ExecuteFetch(fmt.Sprintf("SELECT * FROM %s.%s WHERE 1=0", sqlescape.EscapeID(dbName), sqlescape.EscapeID(table)), 0, true) + + sql := fmt.Sprintf("SELECT * FROM %s.%s WHERE 1=0", sqlescape.EscapeID(dbName), sqlescape.EscapeID(table)) + qr, err := mysqld.executeFetchContext(ctx, conn, sql, 0, true) if err != nil { return nil, nil, err } @@ -179,13 +180,15 @@ func (mysqld *Mysqld) GetColumns(dbName, table string) ([]*querypb.Field, []stri } // GetPrimaryKeyColumns returns the primary key columns of table. -func (mysqld *Mysqld) GetPrimaryKeyColumns(dbName, table string) ([]string, error) { - conn, err := getPoolReconnect(context.TODO(), mysqld.dbaPool) +func (mysqld *Mysqld) GetPrimaryKeyColumns(ctx context.Context, dbName, table string) ([]string, error) { + conn, err := getPoolReconnect(ctx, mysqld.dbaPool) if err != nil { return nil, err } defer conn.Recycle() - qr, err := conn.ExecuteFetch(fmt.Sprintf("SHOW INDEX FROM %s.%s", sqlescape.EscapeID(dbName), sqlescape.EscapeID(table)), 100, true) + + sql := fmt.Sprintf("SHOW INDEX FROM %s.%s", sqlescape.EscapeID(dbName), sqlescape.EscapeID(table)) + qr, err := mysqld.executeFetchContext(ctx, conn, sql, 100, true) if err != nil { return nil, err } @@ -231,11 +234,11 @@ func (mysqld *Mysqld) GetPrimaryKeyColumns(dbName, table string) ([]string, erro // PreflightSchemaChange checks the schema changes in "changes" by applying them // to an intermediate database that has the same schema as the target database. -func (mysqld *Mysqld) PreflightSchemaChange(dbName string, changes []string) ([]*tabletmanagerdatapb.SchemaChangeResult, error) { +func (mysqld *Mysqld) PreflightSchemaChange(ctx context.Context, dbName string, changes []string) ([]*tabletmanagerdatapb.SchemaChangeResult, error) { results := make([]*tabletmanagerdatapb.SchemaChangeResult, len(changes)) // Get current schema from the real database. - originalSchema, err := mysqld.GetSchema(dbName, nil, nil, true) + originalSchema, err := mysqld.GetSchema(ctx, dbName, nil, nil, true) if err != nil { return nil, err } @@ -267,7 +270,7 @@ func (mysqld *Mysqld) PreflightSchemaChange(dbName string, changes []string) ([] // For each change, record the schema before and after. for i, change := range changes { - beforeSchema, err := mysqld.GetSchema("_vt_preflight", nil, nil, true) + beforeSchema, err := mysqld.GetSchema(ctx, "_vt_preflight", nil, nil, true) if err != nil { return nil, err } @@ -281,7 +284,7 @@ func (mysqld *Mysqld) PreflightSchemaChange(dbName string, changes []string) ([] } // get the result - afterSchema, err := mysqld.GetSchema("_vt_preflight", nil, nil, true) + afterSchema, err := mysqld.GetSchema(ctx, "_vt_preflight", nil, nil, true) if err != nil { return nil, err } @@ -300,9 +303,9 @@ func (mysqld *Mysqld) PreflightSchemaChange(dbName string, changes []string) ([] } // ApplySchemaChange will apply the schema change to the given database. -func (mysqld *Mysqld) ApplySchemaChange(dbName string, change *tmutils.SchemaChange) (*tabletmanagerdatapb.SchemaChangeResult, error) { +func (mysqld *Mysqld) ApplySchemaChange(ctx context.Context, dbName string, change *tmutils.SchemaChange) (*tabletmanagerdatapb.SchemaChangeResult, error) { // check current schema matches - beforeSchema, err := mysqld.GetSchema(dbName, nil, nil, true) + beforeSchema, err := mysqld.GetSchema(ctx, dbName, nil, nil, true) if err != nil { return nil, err } @@ -349,7 +352,7 @@ func (mysqld *Mysqld) ApplySchemaChange(dbName string, change *tmutils.SchemaCha } // get AfterSchema - afterSchema, err := mysqld.GetSchema(dbName, nil, nil, true) + afterSchema, err := mysqld.GetSchema(ctx, dbName, nil, nil, true) if err != nil { return nil, err } diff --git a/go/vt/vttablet/tabletmanager/rpc_schema.go b/go/vt/vttablet/tabletmanager/rpc_schema.go index 62e009f6916..04489418f35 100644 --- a/go/vt/vttablet/tabletmanager/rpc_schema.go +++ b/go/vt/vttablet/tabletmanager/rpc_schema.go @@ -31,7 +31,7 @@ import ( // GetSchema returns the schema. func (agent *ActionAgent) GetSchema(ctx context.Context, tables, excludeTables []string, includeViews bool) (*tabletmanagerdatapb.SchemaDefinition, error) { - return agent.MysqlDaemon.GetSchema(topoproto.TabletDbName(agent.Tablet()), tables, excludeTables, includeViews) + return agent.MysqlDaemon.GetSchema(ctx, topoproto.TabletDbName(agent.Tablet()), tables, excludeTables, includeViews) } // ReloadSchema will reload the schema @@ -69,7 +69,7 @@ func (agent *ActionAgent) PreflightSchema(ctx context.Context, changes []string) dbName := topoproto.TabletDbName(agent.Tablet()) // and preflight the change - return agent.MysqlDaemon.PreflightSchemaChange(dbName, changes) + return agent.MysqlDaemon.PreflightSchemaChange(ctx, dbName, changes) } // ApplySchema will apply a schema change @@ -83,7 +83,7 @@ func (agent *ActionAgent) ApplySchema(ctx context.Context, change *tmutils.Schem dbName := topoproto.TabletDbName(agent.Tablet()) // apply the change - scr, err := agent.MysqlDaemon.ApplySchemaChange(dbName, change) + scr, err := agent.MysqlDaemon.ApplySchemaChange(ctx, dbName, change) if err != nil { return nil, err } diff --git a/go/vt/vttablet/tabletmanager/state_change.go b/go/vt/vttablet/tabletmanager/state_change.go index 5bacb3d625b..17a2b768ba2 100644 --- a/go/vt/vttablet/tabletmanager/state_change.go +++ b/go/vt/vttablet/tabletmanager/state_change.go @@ -57,11 +57,11 @@ var ( const blacklistQueryRules string = "BlacklistQueryRules" // loadBlacklistRules loads and builds the blacklist query rules -func (agent *ActionAgent) loadBlacklistRules(tablet *topodatapb.Tablet, blacklistedTables []string) (err error) { +func (agent *ActionAgent) loadBlacklistRules(ctx context.Context, tablet *topodatapb.Tablet, blacklistedTables []string) (err error) { blacklistRules := rules.New() if len(blacklistedTables) > 0 { // tables, first resolve wildcards - tables, err := mysqlctl.ResolveTables(agent.MysqlDaemon, topoproto.TabletDbName(tablet), blacklistedTables) + tables, err := mysqlctl.ResolveTables(ctx, agent.MysqlDaemon, topoproto.TabletDbName(tablet), blacklistedTables) if err != nil { return err } @@ -284,7 +284,7 @@ func (agent *ActionAgent) changeCallback(ctx context.Context, oldTablet, newTabl } agent.setServicesDesiredState(disallowQueryService, runUpdateStream) if updateBlacklistedTables { - if err := agent.loadBlacklistRules(newTablet, blacklistedTables); err != nil { + if err := agent.loadBlacklistRules(ctx, newTablet, blacklistedTables); err != nil { // FIXME(alainjobart) how to handle this error? log.Errorf("Cannot update blacklisted tables rule: %v", err) } else { diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller.go b/go/vt/vttablet/tabletmanager/vreplication/controller.go index 42f53627ad0..7d5acf97cbf 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller.go @@ -199,7 +199,7 @@ func (ct *controller) runBlp(ctx context.Context) (err error) { switch { case len(ct.source.Tables) > 0: // Table names can have search patterns. Resolve them against the schema. - tables, err := mysqlctl.ResolveTables(ct.mysqld, dbClient.DBName(), ct.source.Tables) + tables, err := mysqlctl.ResolveTables(ctx, ct.mysqld, dbClient.DBName(), ct.source.Tables) if err != nil { return vterrors.Wrap(err, "failed to resolve table names") } diff --git a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go index 1302b3fd786..d7fcb54324e 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go @@ -125,7 +125,7 @@ func (vr *vreplicator) Replicate(ctx context.Context) error { } func (vr *vreplicator) replicate(ctx context.Context) error { - tableKeys, err := vr.buildTableKeys() + tableKeys, err := vr.buildTableKeys(ctx) if err != nil { return err } @@ -171,8 +171,8 @@ func (vr *vreplicator) replicate(ctx context.Context) error { } } -func (vr *vreplicator) buildTableKeys() (map[string][]string, error) { - schema, err := vr.mysqld.GetSchema(vr.dbClient.DBName(), []string{"/.*/"}, nil, false) +func (vr *vreplicator) buildTableKeys(ctx context.Context) (map[string][]string, error) { + schema, err := vr.mysqld.GetSchema(ctx, vr.dbClient.DBName(), []string{"/.*/"}, nil, false) if err != nil { return nil, err }