Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions doc/releasenotes/12_0_0_summary.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ If you have reserved connections disabled, you will get the `old` Vitess behavio
## Deprecations

### CLI commands
`VExec` is deprecated and removed. All Online DDL commands should be run through `OnlineDDL`.
`vtctl VExec` does not support Online DDL queries. All Online DDL commands should be run through `OnlineDDL`.

`OnlineDDL revert` is deprecated. Use `REVERT VITESS_MIGRATION '...'` SQL command either via `ApplySchema` or via `vtgate`.
`vtctl OnlineDDL revert` is deprecated. Use `REVERT VITESS_MIGRATION '...'` SQL command either via `ApplySchema` or via `vtgate`.

`InitShardMaster` is deprecated, use `InitShardPrimary` instead.

Expand Down
3 changes: 1 addition & 2 deletions go/vt/schema/online_ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ var (
)

const (
SchemaMigrationsTableName = "schema_migrations"
RevertActionStr = "revert"
RevertActionStr = "revert"
)

func validateWalk(node sqlparser.SQLNode) (kontinue bool, err error) {
Expand Down
46 changes: 46 additions & 0 deletions go/vt/vtctl/vtctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,16 @@ var commands = []commandGroup{
},
},
},
{
"Workflow", []command{
{
name: "VExec",
method: commandVExec,
params: "<ks.workflow> <query> --dry-run",
help: "Runs query on all tablets in workflow. Example: VExec merchant.morders \"update _vt.vreplication set Status='Running'\"",
},
},
},
{
"Workflow", []command{
{
Expand Down Expand Up @@ -3791,6 +3801,42 @@ func commandHelp(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.Flag
return nil
}

func commandVExec(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
json := subFlags.Bool("json", false, "Output JSON instead of human-readable table")
dryRun := subFlags.Bool("dry_run", false, "Does a dry run of VExec and only reports the final query and list of tablets on which it will be applied")
if err := subFlags.Parse(args); err != nil {
return err
}
if subFlags.NArg() != 2 {
return fmt.Errorf("usage: VExec --dry-run keyspace.workflow \"<query>\"")
}
keyspace, workflow, err := splitKeyspaceWorkflow(subFlags.Arg(0))
if err != nil {
return err
}
_, err = wr.TopoServer().GetKeyspace(ctx, keyspace)
if err != nil {
wr.Logger().Errorf("keyspace %s not found", keyspace)
}
query := subFlags.Arg(1)

qr, err := wr.VExecResult(ctx, workflow, keyspace, query, *dryRun)
if err != nil {
return err
}
if *dryRun {
return nil
}
if qr == nil {
wr.Logger().Printf("no result returned\n")
}
if *json {
return printJSON(wr.Logger(), qr)
}
printQueryResult(loggerWriter{wr.Logger()}, qr)
return nil
}

func commandWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
dryRun := subFlags.Bool("dry_run", false, "Does a dry run of Workflow and only reports the final query and list of tablets on which the operation will be applied")
if err := subFlags.Parse(args); err != nil {
Expand Down
112 changes: 0 additions & 112 deletions go/vt/vttablet/onlineddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ import (
"vitess.io/vitess/go/vt/vttablet/tabletserver/connpool"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
"vitess.io/vitess/go/vt/vttablet/tmclient"
"vitess.io/vitess/go/vt/vttablet/vexec"

mysqldriver "github.com/go-sql-driver/mysql"
"github.com/jmoiron/sqlx"
Expand All @@ -75,31 +74,6 @@ var (
ErrMigrationNotFound = errors.New("migration not found")
)

var vexecUpdateTemplates = []string{
`update _vt.schema_migrations set migration_status='val' where mysql_schema='val'`,
`update _vt.schema_migrations set migration_status='val' where migration_uuid='val' and mysql_schema='val'`,
`update _vt.schema_migrations set migration_status='val' where migration_uuid='val' and mysql_schema='val' and shard='val'`,
}

var vexecInsertTemplates = []string{
`INSERT IGNORE INTO _vt.schema_migrations (
migration_uuid,
keyspace,
shard,
mysql_schema,
mysql_table,
migration_statement,
strategy,
options,
ddl_action,
requested_timestamp,
migration_context,
migration_status
) VALUES (
'val', 'val', 'val', 'val', 'val', 'val', 'val', 'val', 'val', FROM_UNIXTIME(0), 'val', 'val'
)`,
}

var emptyResult = &sqltypes.Result{}
var acceptableDropTableIfExistsErrorCodes = []int{mysql.ERCantFindFile, mysql.ERNoSuchTable}

Expand Down Expand Up @@ -275,11 +249,6 @@ func (e *Executor) Open() error {
e.ticks.Start(e.onMigrationCheckTick)
e.triggerNextCheckInterval()

if _, err := sqlparser.QueryMatchesTemplates("select 1 from dual", vexecUpdateTemplates); err != nil {
// this validates vexecUpdateTemplates
return err
}

e.isOpen = true

return nil
Expand Down Expand Up @@ -2992,84 +2961,3 @@ func (e *Executor) OnSchemaMigrationStatus(ctx context.Context,

return e.onSchemaMigrationStatus(ctx, uuidParam, status, dryRun, progressPct, etaSeconds, rowsCopied)
}

// VExec is called by a VExec invocation
// Implements vitess.io/vitess/go/vt/vttablet/vexec.Executor interface
func (e *Executor) VExec(ctx context.Context, vx *vexec.TabletVExec) (qr *querypb.QueryResult, err error) {
response := func(result *sqltypes.Result, err error) (*querypb.QueryResult, error) {
if err != nil {
return nil, err
}
return sqltypes.ResultToProto3(result), nil
}

if err := e.initSchema(ctx); err != nil {
log.Error(err)
return nil, err
}

switch stmt := vx.Stmt.(type) {
case *sqlparser.Delete:
return nil, fmt.Errorf("DELETE statements not supported for this table. query=%s", vx.Query)
case *sqlparser.Select:
return response(e.execQuery(ctx, vx.Query))
case *sqlparser.Insert:
match, err := sqlparser.QueryMatchesTemplates(vx.Query, vexecInsertTemplates)
if err != nil {
return nil, err
}
if !match {
return nil, fmt.Errorf("Query must match one of these templates: %s", strings.Join(vexecInsertTemplates, "; "))
}
// Vexec naturally runs outside shard/schema context. It does not supply values for those columns.
// We can fill them in.
vx.ReplaceInsertColumnVal("shard", vx.ToStringVal(e.shard))
vx.ReplaceInsertColumnVal("mysql_schema", vx.ToStringVal(e.dbName))
vx.AddOrReplaceInsertColumnVal("tablet", vx.ToStringVal(e.TabletAliasString()))
e.triggerNextCheckInterval()
return response(e.execQuery(ctx, vx.Query))
case *sqlparser.Update:
match, err := sqlparser.QueryMatchesTemplates(vx.Query, vexecUpdateTemplates)
if err != nil {
return nil, err
}
if !match {
return nil, fmt.Errorf("Query must match one of these templates: %s; query=%s", strings.Join(vexecUpdateTemplates, "; "), vx.Query)
}
if shard, _ := vx.ColumnStringVal(vx.WhereCols, "shard"); shard != "" {
// shard is specified.
if shard != e.shard {
// specified shard is not _this_ shard. So we're skipping this UPDATE
return sqltypes.ResultToProto3(emptyResult), nil
}
}
statusVal, err := vx.ColumnStringVal(vx.UpdateCols, "migration_status")
if err != nil {
return nil, err
}
switch statusVal {
case retryMigrationHint:
return response(e.retryMigrationWhere(ctx, sqlparser.String(stmt.Where.Expr)))
case cancelMigrationHint:
uuid, err := vx.ColumnStringVal(vx.WhereCols, "migration_uuid")
if err != nil {
return nil, err
}
if !schema.IsOnlineDDLUUID(uuid) {
return nil, fmt.Errorf("Not an Online DDL UUID: %s", uuid)
}
return response(e.CancelMigration(ctx, uuid, "cancel by user"))
case cancelAllMigrationHint:
uuid, _ := vx.ColumnStringVal(vx.WhereCols, "migration_uuid")
if uuid != "" {
return nil, fmt.Errorf("Unexpetced UUID: %s", uuid)
}
return response(e.CancelPendingMigrations(ctx, "cancel-all by user"))
default:
return nil, fmt.Errorf("Unexpected value for migration_status: %v. Supported values are: %s, %s",
statusVal, retryMigrationHint, cancelMigrationHint)
}
default:
return nil, fmt.Errorf("No handler for this query: %s", vx.Query)
}
}
6 changes: 0 additions & 6 deletions go/vt/vttablet/onlineddl/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,12 +482,6 @@ const (
sqlRenameTable = "RENAME TABLE `%a` TO `%a`"
)

const (
retryMigrationHint = "retry"
cancelMigrationHint = "cancel"
cancelAllMigrationHint = "cancel-all"
)

var (
sqlCreateOnlineDDLUser = []string{
`CREATE USER IF NOT EXISTS %s IDENTIFIED BY '%s'`,
Expand Down
3 changes: 0 additions & 3 deletions go/vt/vttablet/tabletmanager/rpc_vexec.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"fmt"

querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/vttablet/vexec"

"context"
Expand All @@ -33,8 +32,6 @@ func (tm *TabletManager) VExec(ctx context.Context, query, workflow, keyspace st
return nil, err
}
switch vx.TableName {
case fmt.Sprintf("%s.%s", vexec.TableQualifier, schema.SchemaMigrationsTableName):
return tm.QueryServiceControl.OnlineDDLExecutor().VExec(ctx, vx)
default:
return nil, fmt.Errorf("table not supported by vexec: %v", vx.TableName)
}
Expand Down
4 changes: 0 additions & 4 deletions go/vt/vttablet/tabletserver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"vitess.io/vitess/go/vt/vttablet/tabletserver/rules"
"vitess.io/vitess/go/vt/vttablet/tabletserver/schema"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
"vitess.io/vitess/go/vt/vttablet/vexec"

"time"

Expand Down Expand Up @@ -82,9 +81,6 @@ type Controller interface {
// QueryService returns the QueryService object used by this Controller
QueryService() queryservice.QueryService

// OnlineDDLExecutor the online DDL executor used by this Controller
OnlineDDLExecutor() vexec.Executor

// SchemaEngine returns the SchemaEngine object used by this Controller
SchemaEngine() *schema.Engine

Expand Down
6 changes: 0 additions & 6 deletions go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ import (
"vitess.io/vitess/go/vt/vttablet/tabletserver/txserializer"
"vitess.io/vitess/go/vt/vttablet/tabletserver/txthrottler"
"vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer"
"vitess.io/vitess/go/vt/vttablet/vexec"
)

// logPoolFull is for throttling transaction / query pool full messages in the log.
Expand Down Expand Up @@ -429,11 +428,6 @@ func (tsv *TabletServer) QueryService() queryservice.QueryService {
return tsv
}

// OnlineDDLExecutor returns the onlineddl.Executor part of TabletServer.
func (tsv *TabletServer) OnlineDDLExecutor() vexec.Executor {
return tsv.onlineDDLExecutor
}

// LagThrottler returns the throttle.Throttler part of TabletServer.
func (tsv *TabletServer) LagThrottler() *throttle.Throttler {
return tsv.lagThrottler
Expand Down
6 changes: 0 additions & 6 deletions go/vt/vttablet/tabletservermock/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"vitess.io/vitess/go/vt/vttablet/tabletserver/rules"
"vitess.io/vitess/go/vt/vttablet/tabletserver/schema"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
"vitess.io/vitess/go/vt/vttablet/vexec"

querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
Expand Down Expand Up @@ -176,11 +175,6 @@ func (tqsc *Controller) ReloadSchema(ctx context.Context) error {
return nil
}

// OnlineDDLExecutor is part of the tabletserver.Controller interface
func (tqsc *Controller) OnlineDDLExecutor() vexec.Executor {
return nil
}

//ClearQueryPlanCache is part of the tabletserver.Controller interface
func (tqsc *Controller) ClearQueryPlanCache() {
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/wrangler/vdiff.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func (wr *Wrangler) VDiff(ctx context.Context, targetKeyspace, workflowName, sou
if err != nil {
wr.Logger().Printf("Error converting report to json: %v", err.Error())
}
jsonOutput += fmt.Sprintf("%s", json)
jsonOutput += string(json)
wr.logger.Printf("%s", jsonOutput)
} else {
for table, dr := range diffReports {
Expand Down
54 changes: 1 addition & 53 deletions go/vt/wrangler/vexec_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"vitess.io/vitess/go/vt/log"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/sqlparser"

"github.com/olekukonko/tablewriter"
Expand Down Expand Up @@ -109,57 +108,8 @@ func (p vreplicationPlanner) dryRun(ctx context.Context) error {
return nil
}

// schemaMigrationsPlanner is a vexecPlanner implementation, specific to _vt.schema_migrations table
type schemaMigrationsPlanner struct {
vx *vexec
d *vexecPlannerParams
}

func newSchemaMigrationsPlanner(vx *vexec) vexecPlanner {
return &schemaMigrationsPlanner{
vx: vx,
d: &vexecPlannerParams{
dbNameColumn: "mysql_schema",
workflowColumn: "migration_uuid",
updateTemplates: []string{
`update _vt.schema_migrations set migration_status='val1'`,
`update _vt.schema_migrations set migration_status='val1' where migration_uuid='val2'`,
`update _vt.schema_migrations set migration_status='val1' where migration_uuid='val2' and shard='val3'`,
},
insertTemplates: []string{
`INSERT IGNORE INTO _vt.schema_migrations (
migration_uuid,
keyspace,
shard,
mysql_schema,
mysql_table,
migration_statement,
strategy,
options,
ddl_action,
requested_timestamp,
migration_context,
migration_status
) VALUES (
'val', 'val', 'val', 'val', 'val', 'val', 'val', 'val', 'val', FROM_UNIXTIME(0), 'val', 'val'
)`,
},
},
}
}
func (p schemaMigrationsPlanner) params() *vexecPlannerParams { return p.d }
func (p schemaMigrationsPlanner) exec(ctx context.Context, primaryAlias *topodatapb.TabletAlias, query string) (*querypb.QueryResult, error) {
qr, err := p.vx.wr.GenericVExec(ctx, primaryAlias, query, p.vx.workflow, p.vx.keyspace)
if err != nil {
return nil, err
}
return qr, nil
}
func (p schemaMigrationsPlanner) dryRun(ctx context.Context) error { return nil }

// make sure these planners implement vexecPlanner interface
var _ vexecPlanner = vreplicationPlanner{}
var _ vexecPlanner = schemaMigrationsPlanner{}

const (
updateQuery = iota
Expand All @@ -168,7 +118,7 @@ const (
selectQuery
)

// extractTableName returns the qualified table name (e.g. "_vt.schema_migrations") from a SELECT/DELETE/UPDATE statement
// extractTableName returns the qualified table name (e.g. "_vt.vreplication") from a SELECT/DELETE/UPDATE statement
func extractTableName(stmt sqlparser.Statement) (string, error) {
switch stmt := stmt.(type) {
case *sqlparser.Update:
Expand All @@ -191,8 +141,6 @@ func qualifiedTableName(tableName string) string {
// getPlanner returns a specific planner appropriate for the queried table
func (vx *vexec) getPlanner(ctx context.Context) error {
switch vx.tableName {
case qualifiedTableName(schema.SchemaMigrationsTableName):
vx.planner = newSchemaMigrationsPlanner(vx)
case qualifiedTableName(vreplicationTableName):
vx.planner = newVReplicationPlanner(vx)
default:
Expand Down