Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions go/vt/schemamanager/schemamanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,29 @@ func newFakeTopo(t *testing.T) *topo.Server {
t.Fatalf("UpdateShardFields failed: %v", err)
}
}
if err := ts.CreateKeyspace(ctx, "unsharded_keyspace", &topodatapb.Keyspace{}); err != nil {
t.Fatalf("CreateKeyspace failed: %v", err)
}
if err := ts.CreateShard(ctx, "unsharded_keyspace", "0"); err != nil {
t.Fatalf("CreateShard(%v) failed: %v", "0", err)
}
tablet := &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "test_cell",
Uid: uint32(4),
},
Keyspace: "test_keyspace",
Shard: "0",
}
if err := ts.CreateTablet(ctx, tablet); err != nil {
t.Fatalf("CreateTablet failed: %v", err)
}
if _, err := ts.UpdateShardFields(ctx, "unsharded_keyspace", "0", func(si *topo.ShardInfo) error {
si.Shard.MasterAlias = tablet.Alias
return nil
}); err != nil {
t.Fatalf("UpdateShardFields failed: %v", err)
}
return ts
}

Expand Down
68 changes: 22 additions & 46 deletions go/vt/schemamanager/tablet_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,16 @@ import (
"golang.org/x/net/context"

"vitess.io/vitess/go/sync2"
"vitess.io/vitess/go/vt/mysqlctl/tmutils"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/wrangler"

tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

// TabletExecutor applies schema changes to all tablets.
type TabletExecutor struct {
wr *wrangler.Wrangler
tablets []*topodatapb.Tablet
schemaDiffs []*tabletmanagerdatapb.SchemaChangeResult
isClosed bool
allowBigSchemaChange bool
keyspace string
Expand Down Expand Up @@ -98,29 +95,13 @@ func (exec *TabletExecutor) Open(ctx context.Context, keyspace string) error {
return nil
}

func parseDDLs(sqls []string) ([]*sqlparser.DDL, error) {
parsedDDLs := make([]*sqlparser.DDL, len(sqls))
for i, sql := range sqls {
stat, err := sqlparser.Parse(sql)
if err != nil {
return nil, fmt.Errorf("failed to parse sql: %s, got error: %v", sql, err)
}
ddl, ok := stat.(*sqlparser.DDL)
if !ok {
return nil, fmt.Errorf("schema change works for DDLs only, but get non DDL statement: %s", sql)
}
parsedDDLs[i] = ddl
}
return parsedDDLs, nil
}

// Validate validates a list of sql statements.
func (exec *TabletExecutor) Validate(ctx context.Context, sqls []string) error {
if exec.isClosed {
return fmt.Errorf("executor is closed")
}

parsedDDLs, err := parseDDLs(sqls)
parsedDDLs, err := exec.parseDDLs(sqls)
if err != nil {
return err
}
Expand All @@ -133,6 +114,25 @@ func (exec *TabletExecutor) Validate(ctx context.Context, sqls []string) error {
return err
}

func (exec *TabletExecutor) parseDDLs(sqls []string) ([]*sqlparser.DDL, error) {
parsedDDLs := make([]*sqlparser.DDL, 0, len(sqls))
for _, sql := range sqls {
stat, err := sqlparser.Parse(sql)
if err != nil {
return nil, fmt.Errorf("failed to parse sql: %s, got error: %v", sql, err)
}
ddl, ok := stat.(*sqlparser.DDL)
if !ok {
if len(exec.tablets) != 1 {
return nil, fmt.Errorf("non-ddl statements can only be executed for single shard keyspaces: %s", sql)
}
continue
}
parsedDDLs = append(parsedDDLs, ddl)
}
return parsedDDLs, nil
}

// a schema change that satisfies any following condition is considered
// to be a big schema change and will be rejected.
// 1. Alter more than 100,000 rows.
Expand Down Expand Up @@ -172,32 +172,8 @@ func (exec *TabletExecutor) detectBigSchemaChanges(ctx context.Context, parsedDD
}

func (exec *TabletExecutor) preflightSchemaChanges(ctx context.Context, sqls []string) error {
schemaDiffs, err := exec.wr.TabletManagerClient().PreflightSchema(ctx, exec.tablets[0], sqls)
if err != nil {
return err
}

parsedDDLs, err := parseDDLs(sqls)
if err != nil {
return err
}

for i, schemaDiff := range schemaDiffs {
diffs := tmutils.DiffSchemaToArray(
"BeforeSchema",
schemaDiff.BeforeSchema,
"AfterSchema",
schemaDiff.AfterSchema)
if len(diffs) == 0 {
if parsedDDLs[i].Action == sqlparser.DropStr && parsedDDLs[i].IfExists {
// DROP IF EXISTS on a nonexistent table does not change the schema. It's safe to ignore.
continue
}
return fmt.Errorf("schema change: '%s' does not introduce any table definition change", sqls[i])
}
}
exec.schemaDiffs = schemaDiffs
return nil
_, err := exec.wr.TabletManagerClient().PreflightSchema(ctx, exec.tablets[0], sqls)
return err
}

// Execute applies schema changes
Expand Down
47 changes: 37 additions & 10 deletions go/vt/schemamanager/tablet_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,27 +163,54 @@ func TestTabletExecutorValidate(t *testing.T) {
}
}

func TestTabletExecutorExecute(t *testing.T) {
executor := newFakeExecutor(t)
func TestTabletExecutorDML(t *testing.T) {
fakeTmc := newFakeTabletManagerClient()

fakeTmc.AddSchemaDefinition("vt_test_keyspace", &tabletmanagerdatapb.SchemaDefinition{
DatabaseSchema: "CREATE DATABASE `{{.DatabaseName}}` /*!40100 DEFAULT CHARACTER SET utf8 */",
TableDefinitions: []*tabletmanagerdatapb.TableDefinition{
{
Name: "test_table",
Schema: "table schema",
Type: tmutils.TableBaseTable,
},
{
Name: "test_table_03",
Schema: "table schema",
Type: tmutils.TableBaseTable,
RowCount: 200000,
},
{
Name: "test_table_04",
Schema: "table schema",
Type: tmutils.TableBaseTable,
RowCount: 3000000,
},
},
})

wr := wrangler.New(logutil.NewConsoleLogger(), newFakeTopo(t), fakeTmc)
executor := NewTabletExecutor(wr, testWaitSlaveTimeout)
ctx := context.Background()

sqls := []string{"DROP TABLE unknown_table"}
executor.Open(ctx, "unsharded_keyspace")
defer executor.Close()

result := executor.Execute(ctx, sqls)
if result.ExecutorErr == "" {
t.Fatalf("execute should fail, call execute.Open first")
// schema changes with DMLs should fail
if err := executor.Validate(ctx, []string{
"INSERT INTO test_table VALUES(1)"}); err != nil {
t.Fatalf("executor.Validate should succeed, for DML to unsharded keyspace")
}
}

func TestTabletExecutorExecute_PreflightWithoutChangesIsAnError(t *testing.T) {
func TestTabletExecutorExecute(t *testing.T) {
executor := newFakeExecutor(t)
ctx := context.Background()
executor.Open(ctx, "test_keyspace")
defer executor.Close()

sqls := []string{"DROP TABLE unknown_table"}

result := executor.Execute(ctx, sqls)
if result.ExecutorErr == "" {
t.Fatalf("execute should fail, ddl does not introduce any table schema change")
t.Fatalf("execute should fail, call execute.Open first")
}
}