Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions go/vt/vtctl/vtctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,10 +396,10 @@ var commands = []commandGroup{
"[-concurrency=10] [-include_master=false] <keyspace>",
"Reloads the schema on all the tablets in a keyspace."},
{"ValidateSchemaShard", commandValidateSchemaShard,
"[-exclude_tables=''] [-include-views] <keyspace/shard>",
"[-exclude_tables=''] [-include-views] [-include-vschema] <keyspace/shard>",
"Validates that the master schema matches all of the replicas."},
{"ValidateSchemaKeyspace", commandValidateSchemaKeyspace,
"[-exclude_tables=''] [-include-views] [-skip-no-master] <keyspace name>",
"[-exclude_tables=''] [-include-views] [-skip-no-master] [-include-vschema] <keyspace name>",
"Validates that the master schema from shard 0 matches the schema on all of the other tablets in the keyspace."},
{"ApplySchema", commandApplySchema,
"[-allow_long_unavailability] [-wait_replicas_timeout=10s] [-ddl_strategy=<ddl_strategy>] [-request_context=<unique-request-context>] [-skip_preflight] {-sql=<sql> || -sql-file=<filename>} <keyspace>",
Expand Down Expand Up @@ -2768,6 +2768,7 @@ func commandReloadSchemaKeyspace(ctx context.Context, wr *wrangler.Wrangler, sub
func commandValidateSchemaShard(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
excludeTables := subFlags.String("exclude_tables", "", "Specifies a comma-separated list of tables to exclude. Each is either an exact match, or a regular expression of the form /regexp/")
includeViews := subFlags.Bool("include-views", false, "Includes views in the validation")
includeVSchema := subFlags.Bool("include-vschema", false, "Validate schemas against the vschema")
if err := subFlags.Parse(args); err != nil {
return err
}
Expand All @@ -2783,13 +2784,14 @@ func commandValidateSchemaShard(ctx context.Context, wr *wrangler.Wrangler, subF
if *excludeTables != "" {
excludeTableArray = strings.Split(*excludeTables, ",")
}
return wr.ValidateSchemaShard(ctx, keyspace, shard, excludeTableArray, *includeViews, false /*includeVSchema*/)
return wr.ValidateSchemaShard(ctx, keyspace, shard, excludeTableArray, *includeViews, *includeVSchema)
}

func commandValidateSchemaKeyspace(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
excludeTables := subFlags.String("exclude_tables", "", "Specifies a comma-separated list of tables to exclude. Each is either an exact match, or a regular expression of the form /regexp/")
includeViews := subFlags.Bool("include-views", false, "Includes views in the validation")
skipNoMaster := subFlags.Bool("skip-no-master", false, "Skip shards that don't have master when performing validation")
includeVSchema := subFlags.Bool("include-vschema", false, "Validate schemas against the vschema")
if err := subFlags.Parse(args); err != nil {
return err
}
Expand All @@ -2802,7 +2804,7 @@ func commandValidateSchemaKeyspace(ctx context.Context, wr *wrangler.Wrangler, s
if *excludeTables != "" {
excludeTableArray = strings.Split(*excludeTables, ",")
}
return wr.ValidateSchemaKeyspace(ctx, keyspace, excludeTableArray, *includeViews, *skipNoMaster)
return wr.ValidateSchemaKeyspace(ctx, keyspace, excludeTableArray, *includeViews, *skipNoMaster, *includeVSchema)
}

func commandApplySchema(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtctld/vtctld.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func InitVtctld(ts *topo.Server) {

actionRepo.RegisterKeyspaceAction("ValidateSchemaKeyspace",
func(ctx context.Context, wr *wrangler.Wrangler, keyspace string) (string, error) {
return "", wr.ValidateSchemaKeyspace(ctx, keyspace, nil, false, false)
return "", wr.ValidateSchemaKeyspace(ctx, keyspace, nil /*excludeTables*/, false /*includeViews*/, false /*skipNoMaster*/, false /*includeVSchema*/)
})

actionRepo.RegisterKeyspaceAction("ValidateVersionKeyspace",
Expand Down
73 changes: 58 additions & 15 deletions go/vt/wrangler/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,20 +160,9 @@ func (wr *Wrangler) ValidateSchemaShard(ctx context.Context, keyspace, shard str
}

if includeVSchema {
vschm, err := wr.ts.GetVSchema(ctx, keyspace)
err := wr.ValidateVSchema(ctx, keyspace, []string{shard}, excludeTables, includeViews)
if err != nil {
return fmt.Errorf("GetVSchema(%s) failed: %v", keyspace, err)
}
notFoundTables := []string{}

for _, tableDef := range masterSchema.TableDefinitions {
if _, ok := vschm.Tables[tableDef.Name]; !ok {
notFoundTables = append(notFoundTables, tableDef.Name)
}
}

if len(notFoundTables) > 0 {
return fmt.Errorf("Vschema Validation Failed: the following tables were not found in the vschema %v", notFoundTables)
return err
}
}

Expand Down Expand Up @@ -204,7 +193,7 @@ func (wr *Wrangler) ValidateSchemaShard(ctx context.Context, keyspace, shard str

// ValidateSchemaKeyspace will diff the schema from all the tablets in
// the keyspace.
func (wr *Wrangler) ValidateSchemaKeyspace(ctx context.Context, keyspace string, excludeTables []string, includeViews, skipNoMaster bool) error {
func (wr *Wrangler) ValidateSchemaKeyspace(ctx context.Context, keyspace string, excludeTables []string, includeViews, skipNoMaster bool, includeVSchema bool) error {
// find all the shards
shards, err := wr.ts.GetShardNames(ctx, keyspace)
if err != nil {
Expand All @@ -217,7 +206,7 @@ func (wr *Wrangler) ValidateSchemaKeyspace(ctx context.Context, keyspace string,
}
sort.Strings(shards)
if len(shards) == 1 {
return wr.ValidateSchemaShard(ctx, keyspace, shards[0], excludeTables, includeViews, false /*includeVSchema*/)
return wr.ValidateSchemaShard(ctx, keyspace, shards[0], excludeTables, includeViews, includeVSchema)
}

var referenceSchema *tabletmanagerdatapb.SchemaDefinition
Expand All @@ -227,6 +216,15 @@ func (wr *Wrangler) ValidateSchemaKeyspace(ctx context.Context, keyspace string,
er := concurrency.AllErrorRecorder{}
wg := sync.WaitGroup{}

// If we are checking against the vschema then all shards
// should just be validated individually against it
if includeVSchema {
err := wr.ValidateVSchema(ctx, keyspace, shards, excludeTables, includeViews)
if err != nil {
return err
}
}

// then diffs all tablets in the other shards
for _, shard := range shards[0:] {
si, err := wr.ts.GetShard(ctx, keyspace, shard)
Expand Down Expand Up @@ -273,6 +271,51 @@ func (wr *Wrangler) ValidateSchemaKeyspace(ctx context.Context, keyspace string,
return nil
}

// ValidateVSchema compares the schema of each primary tablet in "keyspace/shards..." to the vschema and errs if there are differences
func (wr *Wrangler) ValidateVSchema(ctx context.Context, keyspace string, shards []string, excludeTables []string, includeViews bool) error {
vschm, err := wr.ts.GetVSchema(ctx, keyspace)
if err != nil {
return fmt.Errorf("GetVSchema(%s) failed: %v", keyspace, err)
}

shardFailures := concurrency.AllErrorRecorder{}
var wg sync.WaitGroup
wg.Add(len(shards))

for _, shard := range shards {
go func(shard string) {
defer wg.Done()
notFoundTables := []string{}
si, err := wr.ts.GetShard(ctx, keyspace, shard)
if err != nil {
shardFailures.RecordError(fmt.Errorf("GetShard(%v, %v) failed: %v", keyspace, shard, err))
return
}
masterSchema, err := wr.GetSchema(ctx, si.MasterAlias, nil, excludeTables, includeViews)
if err != nil {
shardFailures.RecordError(fmt.Errorf("GetSchema(%s, nil, %v, %v) (%v/%v) failed: %v", si.MasterAlias.String(),
excludeTables, includeViews, keyspace, shard, err,
))
return
}
for _, tableDef := range masterSchema.TableDefinitions {
if _, ok := vschm.Tables[tableDef.Name]; !ok {
notFoundTables = append(notFoundTables, tableDef.Name)
}
}
if len(notFoundTables) > 0 {
shardFailure := fmt.Errorf("%v/%v has tables that are not in the vschema: %v", keyspace, shard, notFoundTables)
shardFailures.RecordError(shardFailure)
}
}(shard)
}
wg.Wait()
if shardFailures.HasErrors() {
return fmt.Errorf("ValidateVSchema(%v, %v, %v, %v) failed: %v", keyspace, shards, excludeTables, includeViews, shardFailures.Error().Error())
}
return nil
}

// PreflightSchema will try a schema change on the remote tablet.
func (wr *Wrangler) PreflightSchema(ctx context.Context, tabletAlias *topodatapb.TabletAlias, changes []string) ([]*tabletmanagerdatapb.SchemaChangeResult, error) {
ti, err := wr.ts.GetTablet(ctx, tabletAlias)
Expand Down
61 changes: 60 additions & 1 deletion go/vt/wrangler/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,67 @@ func TestValidateSchemaShard(t *testing.T) {
}
}

// Schema Checks
err := tme.wr.ValidateSchemaShard(ctx, "ks", "-80", nil /*excludeTables*/, true /*includeViews*/, true /*includeVSchema*/)
require.NoError(t, err)
shouldErr := tme.wr.ValidateSchemaShard(ctx, "ks", "80-", nil /*excludeTables*/, true /*includeViews*/, true /*includeVSchema*/)
require.Contains(t, shouldErr.Error(), "Vschema Validation Failed:")
require.Contains(t, shouldErr.Error(), "ks/80- has tables that are not in the vschema:")

// VSchema Specific Checks
err = tme.wr.ValidateVSchema(ctx, "ks", []string{"-80"}, nil /*excludeTables*/, true /*includeViews*/)
require.NoError(t, err)
shouldErr = tme.wr.ValidateVSchema(ctx, "ks", []string{"80-"}, nil /*excludeTables*/, true /*includeVoews*/)
require.Contains(t, shouldErr.Error(), "ks/80- has tables that are not in the vschema:")
}

func TestValidateSchemaKeyspace(t *testing.T) {
ctx := context.Background()
sourceShards := []string{"-80", "80-"}
targetShards := []string{"-40", "40-80", "80-c0", "c0-"}

tmePass := newTestShardMigrater(ctx, t, sourceShards, targetShards)
tmeDiffs := newTestShardMigrater(ctx, t, sourceShards, targetShards)

schm := &tabletmanagerdatapb.SchemaDefinition{
TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{
Name: "not_in_vschema",
Columns: []string{"c1", "c2"},
PrimaryKeyColumns: []string{"c1"},
Fields: sqltypes.MakeTestFields("c1|c2", "int64|int64"),
}},
}

// This is the vschema returned by newTestShardMigrater
sameAsVSchema := &tabletmanagerdatapb.SchemaDefinition{
TableDefinitions: []*tabletmanagerdatapb.TableDefinition{
{
Name: "t1",
Columns: []string{"c1"},
},
{
Name: "t2",
Columns: []string{"c1"},
},
{
Name: "t3",
Columns: []string{"c1"},
},
},
}

for _, primary := range append(tmePass.sourceMasters, tmePass.targetMasters...) {
primary.FakeMysqlDaemon.Schema = sameAsVSchema
}

for _, primary := range append(tmeDiffs.sourceMasters, tmeDiffs.targetMasters...) {
primary.FakeMysqlDaemon.Schema = schm
}

// Schema Checks
err := tmePass.wr.ValidateSchemaKeyspace(ctx, "ks", nil /*excludeTables*/, true /*includeViews*/, true /*skipNoMaster*/, true /*includeVSchema*/)
require.NoError(t, err)
err = tmePass.wr.ValidateSchemaKeyspace(ctx, "ks", nil /*excludeTables*/, true /*includeViews*/, true /*skipNoMaster*/, false /*includeVSchema*/)
require.NoError(t, err)
shouldErr := tmeDiffs.wr.ValidateSchemaKeyspace(ctx, "ks", nil /*excludeTables*/, true /*includeViews*/, true /*skipNoMaster*/, true /*includeVSchema*/)
require.Error(t, shouldErr)
}
14 changes: 3 additions & 11 deletions go/vt/wrangler/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,17 +178,9 @@ func (vrw *VReplicationWorkflow) Create(ctx context.Context) error {
excludeTables := strings.Split(vrw.params.ExcludeTables, ",")
keyspace := vrw.params.SourceKeyspace

errs := []string{}
for _, shard := range vrw.params.SourceShards {
if err := vrw.wr.ValidateSchemaShard(ctx, keyspace, shard, excludeTables, true /*includeViews*/, true /*includeVschema*/); err != nil {
errMsg := fmt.Sprintf("%s/%s: %s", keyspace, shard, err.Error())
errs = append(errs, errMsg)
}
}

// There were some schema drifts
if len(errs) > 0 {
return fmt.Errorf("Create ReshardWorkflow failed Schema Validation:\n" + strings.Join(errs, "\n"))
vschmErr := vrw.wr.ValidateVSchema(ctx, keyspace, vrw.params.SourceShards, excludeTables, true /*includeViews*/)
if vschmErr != nil {
return fmt.Errorf("Create ReshardWorkflow failed: %v", vschmErr)
}

err = vrw.initReshard()
Expand Down
2 changes: 1 addition & 1 deletion go/vt/wrangler/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ func TestVRWSchemaValidation(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, vrwf)
shouldErr := vrwf.Create(ctx)
require.Contains(t, shouldErr.Error(), "Create ReshardWorkflow failed Schema Validation")
require.Contains(t, shouldErr.Error(), "Create ReshardWorkflow failed: ValidateVSchema")
}

func TestReshardV2Cancel(t *testing.T) {
Expand Down