From 3479947cb769c26367b864ea80c410188000ce34 Mon Sep 17 00:00:00 2001 From: Malcolm Akinje Date: Fri, 30 Apr 2021 13:25:37 -0700 Subject: [PATCH] added ValidateVSchema and ValidateVSchemaKeyspace Signed-off-by: Malcolm Akinje --- go/vt/vtctl/vtctl.go | 10 +++-- go/vt/vtctld/vtctld.go | 2 +- go/vt/wrangler/schema.go | 73 ++++++++++++++++++++++++++------- go/vt/wrangler/schema_test.go | 61 ++++++++++++++++++++++++++- go/vt/wrangler/workflow.go | 14 ++----- go/vt/wrangler/workflow_test.go | 2 +- 6 files changed, 129 insertions(+), 33 deletions(-) diff --git a/go/vt/vtctl/vtctl.go b/go/vt/vtctl/vtctl.go index c475e53d163..1119ee9f326 100644 --- a/go/vt/vtctl/vtctl.go +++ b/go/vt/vtctl/vtctl.go @@ -396,10 +396,10 @@ var commands = []commandGroup{ "[-concurrency=10] [-include_master=false] ", "Reloads the schema on all the tablets in a keyspace."}, {"ValidateSchemaShard", commandValidateSchemaShard, - "[-exclude_tables=''] [-include-views] ", + "[-exclude_tables=''] [-include-views] [-include-vschema] ", "Validates that the master schema matches all of the replicas."}, {"ValidateSchemaKeyspace", commandValidateSchemaKeyspace, - "[-exclude_tables=''] [-include-views] [-skip-no-master] ", + "[-exclude_tables=''] [-include-views] [-skip-no-master] [-include-vschema] ", "Validates that the master schema from shard 0 matches the schema on all of the other tablets in the keyspace."}, {"ApplySchema", commandApplySchema, "[-allow_long_unavailability] [-wait_replicas_timeout=10s] [-ddl_strategy=] [-request_context=] [-skip_preflight] {-sql= || -sql-file=} ", @@ -2768,6 +2768,7 @@ func commandReloadSchemaKeyspace(ctx context.Context, wr *wrangler.Wrangler, sub func commandValidateSchemaShard(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error { excludeTables := subFlags.String("exclude_tables", "", "Specifies a comma-separated list of tables to exclude. Each is either an exact match, or a regular expression of the form /regexp/") includeViews := subFlags.Bool("include-views", false, "Includes views in the validation") + includeVSchema := subFlags.Bool("include-vschema", false, "Validate schemas against the vschema") if err := subFlags.Parse(args); err != nil { return err } @@ -2783,13 +2784,14 @@ func commandValidateSchemaShard(ctx context.Context, wr *wrangler.Wrangler, subF if *excludeTables != "" { excludeTableArray = strings.Split(*excludeTables, ",") } - return wr.ValidateSchemaShard(ctx, keyspace, shard, excludeTableArray, *includeViews, false /*includeVSchema*/) + return wr.ValidateSchemaShard(ctx, keyspace, shard, excludeTableArray, *includeViews, *includeVSchema) } func commandValidateSchemaKeyspace(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error { excludeTables := subFlags.String("exclude_tables", "", "Specifies a comma-separated list of tables to exclude. Each is either an exact match, or a regular expression of the form /regexp/") includeViews := subFlags.Bool("include-views", false, "Includes views in the validation") skipNoMaster := subFlags.Bool("skip-no-master", false, "Skip shards that don't have master when performing validation") + includeVSchema := subFlags.Bool("include-vschema", false, "Validate schemas against the vschema") if err := subFlags.Parse(args); err != nil { return err } @@ -2802,7 +2804,7 @@ func commandValidateSchemaKeyspace(ctx context.Context, wr *wrangler.Wrangler, s if *excludeTables != "" { excludeTableArray = strings.Split(*excludeTables, ",") } - return wr.ValidateSchemaKeyspace(ctx, keyspace, excludeTableArray, *includeViews, *skipNoMaster) + return wr.ValidateSchemaKeyspace(ctx, keyspace, excludeTableArray, *includeViews, *skipNoMaster, *includeVSchema) } func commandApplySchema(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error { diff --git a/go/vt/vtctld/vtctld.go b/go/vt/vtctld/vtctld.go index e60a05f412e..66efd262f68 100644 --- a/go/vt/vtctld/vtctld.go +++ b/go/vt/vtctld/vtctld.go @@ -60,7 +60,7 @@ func InitVtctld(ts *topo.Server) { actionRepo.RegisterKeyspaceAction("ValidateSchemaKeyspace", func(ctx context.Context, wr *wrangler.Wrangler, keyspace string) (string, error) { - return "", wr.ValidateSchemaKeyspace(ctx, keyspace, nil, false, false) + return "", wr.ValidateSchemaKeyspace(ctx, keyspace, nil /*excludeTables*/, false /*includeViews*/, false /*skipNoMaster*/, false /*includeVSchema*/) }) actionRepo.RegisterKeyspaceAction("ValidateVersionKeyspace", diff --git a/go/vt/wrangler/schema.go b/go/vt/wrangler/schema.go index f1c8e9573f7..487c63bc5e0 100644 --- a/go/vt/wrangler/schema.go +++ b/go/vt/wrangler/schema.go @@ -160,20 +160,9 @@ func (wr *Wrangler) ValidateSchemaShard(ctx context.Context, keyspace, shard str } if includeVSchema { - vschm, err := wr.ts.GetVSchema(ctx, keyspace) + err := wr.ValidateVSchema(ctx, keyspace, []string{shard}, excludeTables, includeViews) if err != nil { - return fmt.Errorf("GetVSchema(%s) failed: %v", keyspace, err) - } - notFoundTables := []string{} - - for _, tableDef := range masterSchema.TableDefinitions { - if _, ok := vschm.Tables[tableDef.Name]; !ok { - notFoundTables = append(notFoundTables, tableDef.Name) - } - } - - if len(notFoundTables) > 0 { - return fmt.Errorf("Vschema Validation Failed: the following tables were not found in the vschema %v", notFoundTables) + return err } } @@ -204,7 +193,7 @@ func (wr *Wrangler) ValidateSchemaShard(ctx context.Context, keyspace, shard str // ValidateSchemaKeyspace will diff the schema from all the tablets in // the keyspace. -func (wr *Wrangler) ValidateSchemaKeyspace(ctx context.Context, keyspace string, excludeTables []string, includeViews, skipNoMaster bool) error { +func (wr *Wrangler) ValidateSchemaKeyspace(ctx context.Context, keyspace string, excludeTables []string, includeViews, skipNoMaster bool, includeVSchema bool) error { // find all the shards shards, err := wr.ts.GetShardNames(ctx, keyspace) if err != nil { @@ -217,7 +206,7 @@ func (wr *Wrangler) ValidateSchemaKeyspace(ctx context.Context, keyspace string, } sort.Strings(shards) if len(shards) == 1 { - return wr.ValidateSchemaShard(ctx, keyspace, shards[0], excludeTables, includeViews, false /*includeVSchema*/) + return wr.ValidateSchemaShard(ctx, keyspace, shards[0], excludeTables, includeViews, includeVSchema) } var referenceSchema *tabletmanagerdatapb.SchemaDefinition @@ -227,6 +216,15 @@ func (wr *Wrangler) ValidateSchemaKeyspace(ctx context.Context, keyspace string, er := concurrency.AllErrorRecorder{} wg := sync.WaitGroup{} + // If we are checking against the vschema then all shards + // should just be validated individually against it + if includeVSchema { + err := wr.ValidateVSchema(ctx, keyspace, shards, excludeTables, includeViews) + if err != nil { + return err + } + } + // then diffs all tablets in the other shards for _, shard := range shards[0:] { si, err := wr.ts.GetShard(ctx, keyspace, shard) @@ -273,6 +271,51 @@ func (wr *Wrangler) ValidateSchemaKeyspace(ctx context.Context, keyspace string, return nil } +// ValidateVSchema compares the schema of each primary tablet in "keyspace/shards..." to the vschema and errs if there are differences +func (wr *Wrangler) ValidateVSchema(ctx context.Context, keyspace string, shards []string, excludeTables []string, includeViews bool) error { + vschm, err := wr.ts.GetVSchema(ctx, keyspace) + if err != nil { + return fmt.Errorf("GetVSchema(%s) failed: %v", keyspace, err) + } + + shardFailures := concurrency.AllErrorRecorder{} + var wg sync.WaitGroup + wg.Add(len(shards)) + + for _, shard := range shards { + go func(shard string) { + defer wg.Done() + notFoundTables := []string{} + si, err := wr.ts.GetShard(ctx, keyspace, shard) + if err != nil { + shardFailures.RecordError(fmt.Errorf("GetShard(%v, %v) failed: %v", keyspace, shard, err)) + return + } + masterSchema, err := wr.GetSchema(ctx, si.MasterAlias, nil, excludeTables, includeViews) + if err != nil { + shardFailures.RecordError(fmt.Errorf("GetSchema(%s, nil, %v, %v) (%v/%v) failed: %v", si.MasterAlias.String(), + excludeTables, includeViews, keyspace, shard, err, + )) + return + } + for _, tableDef := range masterSchema.TableDefinitions { + if _, ok := vschm.Tables[tableDef.Name]; !ok { + notFoundTables = append(notFoundTables, tableDef.Name) + } + } + if len(notFoundTables) > 0 { + shardFailure := fmt.Errorf("%v/%v has tables that are not in the vschema: %v", keyspace, shard, notFoundTables) + shardFailures.RecordError(shardFailure) + } + }(shard) + } + wg.Wait() + if shardFailures.HasErrors() { + return fmt.Errorf("ValidateVSchema(%v, %v, %v, %v) failed: %v", keyspace, shards, excludeTables, includeViews, shardFailures.Error().Error()) + } + return nil +} + // PreflightSchema will try a schema change on the remote tablet. func (wr *Wrangler) PreflightSchema(ctx context.Context, tabletAlias *topodatapb.TabletAlias, changes []string) ([]*tabletmanagerdatapb.SchemaChangeResult, error) { ti, err := wr.ts.GetTablet(ctx, tabletAlias) diff --git a/go/vt/wrangler/schema_test.go b/go/vt/wrangler/schema_test.go index a82e2ecd056..6c77a190f3e 100644 --- a/go/vt/wrangler/schema_test.go +++ b/go/vt/wrangler/schema_test.go @@ -68,8 +68,67 @@ func TestValidateSchemaShard(t *testing.T) { } } + // Schema Checks err := tme.wr.ValidateSchemaShard(ctx, "ks", "-80", nil /*excludeTables*/, true /*includeViews*/, true /*includeVSchema*/) require.NoError(t, err) shouldErr := tme.wr.ValidateSchemaShard(ctx, "ks", "80-", nil /*excludeTables*/, true /*includeViews*/, true /*includeVSchema*/) - require.Contains(t, shouldErr.Error(), "Vschema Validation Failed:") + require.Contains(t, shouldErr.Error(), "ks/80- has tables that are not in the vschema:") + + // VSchema Specific Checks + err = tme.wr.ValidateVSchema(ctx, "ks", []string{"-80"}, nil /*excludeTables*/, true /*includeViews*/) + require.NoError(t, err) + shouldErr = tme.wr.ValidateVSchema(ctx, "ks", []string{"80-"}, nil /*excludeTables*/, true /*includeVoews*/) + require.Contains(t, shouldErr.Error(), "ks/80- has tables that are not in the vschema:") +} + +func TestValidateSchemaKeyspace(t *testing.T) { + ctx := context.Background() + sourceShards := []string{"-80", "80-"} + targetShards := []string{"-40", "40-80", "80-c0", "c0-"} + + tmePass := newTestShardMigrater(ctx, t, sourceShards, targetShards) + tmeDiffs := newTestShardMigrater(ctx, t, sourceShards, targetShards) + + schm := &tabletmanagerdatapb.SchemaDefinition{ + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{ + Name: "not_in_vschema", + Columns: []string{"c1", "c2"}, + PrimaryKeyColumns: []string{"c1"}, + Fields: sqltypes.MakeTestFields("c1|c2", "int64|int64"), + }}, + } + + // This is the vschema returned by newTestShardMigrater + sameAsVSchema := &tabletmanagerdatapb.SchemaDefinition{ + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{ + { + Name: "t1", + Columns: []string{"c1"}, + }, + { + Name: "t2", + Columns: []string{"c1"}, + }, + { + Name: "t3", + Columns: []string{"c1"}, + }, + }, + } + + for _, primary := range append(tmePass.sourceMasters, tmePass.targetMasters...) { + primary.FakeMysqlDaemon.Schema = sameAsVSchema + } + + for _, primary := range append(tmeDiffs.sourceMasters, tmeDiffs.targetMasters...) { + primary.FakeMysqlDaemon.Schema = schm + } + + // Schema Checks + err := tmePass.wr.ValidateSchemaKeyspace(ctx, "ks", nil /*excludeTables*/, true /*includeViews*/, true /*skipNoMaster*/, true /*includeVSchema*/) + require.NoError(t, err) + err = tmePass.wr.ValidateSchemaKeyspace(ctx, "ks", nil /*excludeTables*/, true /*includeViews*/, true /*skipNoMaster*/, false /*includeVSchema*/) + require.NoError(t, err) + shouldErr := tmeDiffs.wr.ValidateSchemaKeyspace(ctx, "ks", nil /*excludeTables*/, true /*includeViews*/, true /*skipNoMaster*/, true /*includeVSchema*/) + require.Error(t, shouldErr) } diff --git a/go/vt/wrangler/workflow.go b/go/vt/wrangler/workflow.go index 70bff2621c4..b0b048024ed 100644 --- a/go/vt/wrangler/workflow.go +++ b/go/vt/wrangler/workflow.go @@ -178,17 +178,9 @@ func (vrw *VReplicationWorkflow) Create(ctx context.Context) error { excludeTables := strings.Split(vrw.params.ExcludeTables, ",") keyspace := vrw.params.SourceKeyspace - errs := []string{} - for _, shard := range vrw.params.SourceShards { - if err := vrw.wr.ValidateSchemaShard(ctx, keyspace, shard, excludeTables, true /*includeViews*/, true /*includeVschema*/); err != nil { - errMsg := fmt.Sprintf("%s/%s: %s", keyspace, shard, err.Error()) - errs = append(errs, errMsg) - } - } - - // There were some schema drifts - if len(errs) > 0 { - return fmt.Errorf("Create ReshardWorkflow failed Schema Validation:\n" + strings.Join(errs, "\n")) + vschmErr := vrw.wr.ValidateVSchema(ctx, keyspace, vrw.params.SourceShards, excludeTables, true /*includeViews*/) + if vschmErr != nil { + return fmt.Errorf("Create ReshardWorkflow failed: %v", vschmErr) } err = vrw.initReshard() diff --git a/go/vt/wrangler/workflow_test.go b/go/vt/wrangler/workflow_test.go index 0e9a566ec42..b2325b3e4e1 100644 --- a/go/vt/wrangler/workflow_test.go +++ b/go/vt/wrangler/workflow_test.go @@ -417,7 +417,7 @@ func TestVRWSchemaValidation(t *testing.T) { require.NoError(t, err) require.NotNil(t, vrwf) shouldErr := vrwf.Create(ctx) - require.Contains(t, shouldErr.Error(), "Create ReshardWorkflow failed Schema Validation") + require.Contains(t, shouldErr.Error(), "Create ReshardWorkflow failed: ValidateVSchema") } func TestReshardV2Cancel(t *testing.T) {