Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 35 additions & 10 deletions go/vt/vttablet/onlineddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1989,7 +1989,7 @@ func (e *Executor) terminateMigration(ctx context.Context, onlineDDL *schema.Onl
// CancelMigration attempts to abort a scheduled or a running migration
func (e *Executor) CancelMigration(ctx context.Context, uuid string, message string, issuedByUser bool) (result *sqltypes.Result, err error) {
if atomic.LoadInt64(&e.isOpen) == 0 {
return nil, vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, "online ddl is disabled")
return nil, vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, schema.ErrOnlineDDLDisabled.Error())
}
log.Infof("CancelMigration: request to cancel %s with message: %v", uuid, message)

Expand Down Expand Up @@ -2060,7 +2060,7 @@ func (e *Executor) cancelMigrations(ctx context.Context, cancellable []*cancella
// for this keyspace
func (e *Executor) CancelPendingMigrations(ctx context.Context, message string, issuedByUser bool) (result *sqltypes.Result, err error) {
if atomic.LoadInt64(&e.isOpen) == 0 {
return nil, vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, "online ddl is disabled")
return nil, vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, schema.ErrOnlineDDLDisabled.Error())
}

uuids, err := e.readPendingMigrationsUUIDs(ctx)
Expand Down Expand Up @@ -4335,7 +4335,7 @@ func (e *Executor) retryMigrationWhere(ctx context.Context, whereExpr string) (r
// RetryMigration marks given migration for retry
func (e *Executor) RetryMigration(ctx context.Context, uuid string) (result *sqltypes.Result, err error) {
if atomic.LoadInt64(&e.isOpen) == 0 {
return nil, vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, "online ddl is disabled")
return nil, vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, schema.ErrOnlineDDLDisabled.Error())
}
if !schema.IsOnlineDDLUUID(uuid) {
return nil, vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "Not a valid migration ID in RETRY: %s", uuid)
Expand All @@ -4359,7 +4359,7 @@ func (e *Executor) RetryMigration(ctx context.Context, uuid string) (result *sql
// next iteration of gcArtifacts() picks up the migration's artifacts and schedules them for deletion
func (e *Executor) CleanupMigration(ctx context.Context, uuid string) (result *sqltypes.Result, err error) {
if atomic.LoadInt64(&e.isOpen) == 0 {
return nil, vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, "online ddl is disabled")
return nil, vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, schema.ErrOnlineDDLDisabled.Error())
}
if !schema.IsOnlineDDLUUID(uuid) {
return nil, vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "Not a valid migration ID in CLEANUP: %s", uuid)
Expand All @@ -4385,7 +4385,7 @@ func (e *Executor) CleanupMigration(ctx context.Context, uuid string) (result *s
// CompleteMigration clears the postpone_completion flag for a given migration, assuming it was set in the first place
func (e *Executor) CompleteMigration(ctx context.Context, uuid string) (result *sqltypes.Result, err error) {
if atomic.LoadInt64(&e.isOpen) == 0 {
return nil, vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, "online ddl is disabled")
return nil, vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, schema.ErrOnlineDDLDisabled.Error())
}
if !schema.IsOnlineDDLUUID(uuid) {
return nil, vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "Not a valid migration ID in COMPLETE: %s", uuid)
Expand Down Expand Up @@ -4419,7 +4419,7 @@ func (e *Executor) CompleteMigration(ctx context.Context, uuid string) (result *
// for this keyspace
func (e *Executor) CompletePendingMigrations(ctx context.Context) (result *sqltypes.Result, err error) {
if atomic.LoadInt64(&e.isOpen) == 0 {
return nil, vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, "online ddl is disabled")
return nil, vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, schema.ErrOnlineDDLDisabled.Error())
}

uuids, err := e.readPendingMigrationsUUIDs(ctx)
Expand All @@ -4444,7 +4444,7 @@ func (e *Executor) CompletePendingMigrations(ctx context.Context) (result *sqlty
// LaunchMigration clears the postpone_launch flag for a given migration, assuming it was set in the first place
func (e *Executor) LaunchMigration(ctx context.Context, uuid string, shardsArg string) (result *sqltypes.Result, err error) {
if atomic.LoadInt64(&e.isOpen) == 0 {
return nil, vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, "online ddl is disabled")
return nil, vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, schema.ErrOnlineDDLDisabled.Error())
}
if !schema.IsOnlineDDLUUID(uuid) {
return nil, vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "Not a valid migration ID in EXECUTE: %s", uuid)
Expand Down Expand Up @@ -4476,7 +4476,7 @@ func (e *Executor) LaunchMigration(ctx context.Context, uuid string, shardsArg s
// LaunchMigrations launches all launch-postponed queued migrations for this keyspace
func (e *Executor) LaunchMigrations(ctx context.Context) (result *sqltypes.Result, err error) {
if atomic.LoadInt64(&e.isOpen) == 0 {
return nil, vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, "online ddl is disabled")
return nil, vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, schema.ErrOnlineDDLDisabled.Error())
}

uuids, err := e.readPendingMigrationsUUIDs(ctx)
Expand Down Expand Up @@ -4598,7 +4598,7 @@ func (e *Executor) SubmitMigration(
stmt sqlparser.Statement,
) (*sqltypes.Result, error) {
if atomic.LoadInt64(&e.isOpen) == 0 {
return nil, vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, "online ddl is disabled")
return nil, vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, schema.ErrOnlineDDLDisabled.Error())
}

log.Infof("SubmitMigration: request to submit migration with statement: %0.50s...", sqlparser.CanonicalString(stmt))
Expand Down Expand Up @@ -4693,10 +4693,35 @@ func (e *Executor) SubmitMigration(
return result, nil
}

// ShowMigrations shows migrations, optionally filtered by a condition
func (e *Executor) ShowMigrations(ctx context.Context, show *sqlparser.Show) (result *sqltypes.Result, err error) {
if atomic.LoadInt64(&e.isOpen) == 0 {
return nil, vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, schema.ErrOnlineDDLDisabled.Error())
}
showBasic, ok := show.Internal.(*sqlparser.ShowBasic)
if !ok {
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "[BUG] ShowMigrations expects a ShowBasic statement. Got: %s", sqlparser.String(show))
}
if showBasic.Command != sqlparser.VitessMigrations {
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "[BUG] ShowMigrations expects a VitessMigrations command, got %+v. Statement: %s", showBasic.Command, sqlparser.String(show))
}
whereExpr := ""
if showBasic.Filter != nil {
if showBasic.Filter.Filter != nil {
whereExpr = fmt.Sprintf(" where %s", sqlparser.String(showBasic.Filter.Filter))
} else if showBasic.Filter.Like != "" {
lit := sqlparser.String(sqlparser.NewStrLiteral(showBasic.Filter.Like))
whereExpr = fmt.Sprintf(" where migration_uuid LIKE %s OR migration_context LIKE %s OR migration_status LIKE %s", lit, lit, lit)
}
}
query := sqlparser.BuildParsedQuery(sqlShowMigrationsWhere, whereExpr).Query
return e.execQuery(ctx, query)
}

// ShowMigrationLogs reads the migration log for a given migration
func (e *Executor) ShowMigrationLogs(ctx context.Context, stmt *sqlparser.ShowMigrationLogs) (result *sqltypes.Result, err error) {
if atomic.LoadInt64(&e.isOpen) == 0 {
return nil, vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, "online ddl is disabled")
return nil, vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, schema.ErrOnlineDDLDisabled.Error())
}
_, row, err := e.readMigration(ctx, stmt.UUID)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vttablet/onlineddl/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,10 @@ const (
AND cleanup_timestamp IS NULL
AND completed_timestamp IS NULL
`
sqlShowMigrationsWhere = `SELECT *
FROM _vt.schema_migrations
%s
`
sqlSelectMigration = `SELECT
id,
migration_uuid,
Expand Down
5 changes: 4 additions & 1 deletion go/vt/vttablet/tabletserver/planbuilder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,10 @@ func analyzeInsert(ins *sqlparser.Insert, tables map[string]*schema.Table) (plan
func analyzeShow(show *sqlparser.Show, dbName string) (plan *Plan, err error) {
switch showInternal := show.Internal.(type) {
case *sqlparser.ShowBasic:
if showInternal.Command == sqlparser.Table {
switch showInternal.Command {
case sqlparser.VitessMigrations:
return &Plan{PlanID: PlanShowMigrations, FullStmt: show}, nil
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i chose to return a PlanShowMigrations rather than returning a PlanSelect and generating SQL here.

Copy link
Member

@harshit-gangal harshit-gangal Mar 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The benefit of doing it at plan time is that it is cached. So, at execution, it does not need to create the select query and parse it.
I will leave this to you as it is not a critical query on execution.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caching is a good point. I feel like it's wrong to construct this query in query_executor, though, and that it makes more sense in onlineddl.Executor. We can always change that in the future.

case sqlparser.Table:
// rewrite WHERE clause if it exists
// `where Tables_in_Keyspace` => `where Tables_in_DbName`
if showInternal.Filter != nil {
Expand Down
2 changes: 2 additions & 0 deletions go/vt/vttablet/tabletserver/planbuilder/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ const (
PlanCallProc
PlanAlterMigration
PlanRevertMigration
PlanShowMigrations
PlanShowMigrationLogs
PlanShowThrottledApps
PlanShowThrottlerStatus
Expand Down Expand Up @@ -112,6 +113,7 @@ var planName = []string{
"CallProcedure",
"AlterMigration",
"RevertMigration",
"ShowMigrations",
"ShowMigrationLogs",
"ShowThrottledApps",
"ShowThrottlerStatus",
Expand Down
9 changes: 9 additions & 0 deletions go/vt/vttablet/tabletserver/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,8 @@ func (qre *QueryExecutor) Execute() (reply *sqltypes.Result, err error) {
return qre.execAlterMigration()
case p.PlanRevertMigration:
return qre.execRevertMigration()
case p.PlanShowMigrations:
return qre.execShowMigrations()
case p.PlanShowMigrationLogs:
return qre.execShowMigrationLogs()
case p.PlanShowThrottledApps:
Expand Down Expand Up @@ -1057,6 +1059,13 @@ func (qre *QueryExecutor) execRevertMigration() (*sqltypes.Result, error) {
return qre.tsv.onlineDDLExecutor.SubmitMigration(qre.ctx, qre.plan.FullStmt)
}

func (qre *QueryExecutor) execShowMigrations() (*sqltypes.Result, error) {
if showStmt, ok := qre.plan.FullStmt.(*sqlparser.Show); ok {
return qre.tsv.onlineDDLExecutor.ShowMigrations(qre.ctx, showStmt)
}
return nil, vterrors.New(vtrpcpb.Code_INTERNAL, "Expecting SHOW VITESS_MIGRATIONS plan")
}

func (qre *QueryExecutor) execShowMigrationLogs() (*sqltypes.Result, error) {
if showMigrationLogsStmt, ok := qre.plan.FullStmt.(*sqlparser.ShowMigrationLogs); ok {
return qre.tsv.onlineDDLExecutor.ShowMigrationLogs(qre.ctx, showMigrationLogsStmt)
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/query_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ func TestDisableOnlineDDL(t *testing.T) {

qre = newTestQueryExecutor(ctx, tsv, query, 0)
_, err = qre.Execute()
require.EqualError(t, err, "online ddl is disabled")
require.EqualError(t, err, "online DDL is disabled")
}

func TestQueryExecutorLimitFailure(t *testing.T) {
Expand Down