diff --git a/go/vt/wrangler/materializer.go b/go/vt/wrangler/materializer.go index 67f4476adbb..dab7a37c3e5 100644 --- a/go/vt/wrangler/materializer.go +++ b/go/vt/wrangler/materializer.go @@ -589,43 +589,46 @@ func (wr *Wrangler) buildMaterializer(ctx context.Context, ms *vtctldatapb.Mater }, nil } +func (mz *materializer) getSourceTableDDLs(ctx context.Context) (map[string]string, error) { + sourceDDLs := make(map[string]string) + allTables := []string{"/.*/"} + + sourceMaster := mz.sourceShards[0].MasterAlias + if sourceMaster == nil { + return nil, fmt.Errorf("source shard must have a master for copying schema: %v", mz.sourceShards[0].ShardName()) + } + + log.Infof("getting table schemas from source master %v...", sourceMaster) + var err error + sourceSchema, err := mz.wr.GetSchema(ctx, sourceMaster, allTables, nil, false) + if err != nil { + return nil, err + } + log.Infof("got table schemas from source master %v.", sourceMaster) + + for _, td := range sourceSchema.TableDefinitions { + sourceDDLs[td.Name] = td.Schema + } + return sourceDDLs, nil +} + func (mz *materializer) deploySchema(ctx context.Context) error { + var sourceDDLs map[string]string + var mu sync.Mutex return mz.forAllTargets(func(target *topo.ShardInfo) error { allTables := []string{"/.*/"} hasTargetTable := map[string]bool{} - { - log.Infof("getting table schemas from target master %v...", target.MasterAlias) - targetSchema, err := mz.wr.GetSchema(ctx, target.MasterAlias, allTables, nil, false) - if err != nil { - return err - } - log.Infof("got table schemas from target master %v.", target.MasterAlias) - - for _, td := range targetSchema.TableDefinitions { - hasTargetTable[td.Name] = true - } + log.Infof("getting table schemas from target master %v...", target.MasterAlias) + targetSchema, err := mz.wr.GetSchema(ctx, target.MasterAlias, allTables, nil, false) + if err != nil { + return err } + log.Infof("got table schemas from target master %v.", target.MasterAlias) - sourceDDL := map[string]string{} - { - sourceMaster := mz.sourceShards[0].MasterAlias - if sourceMaster == nil { - return fmt.Errorf("source shard must have a master for copying schema: %v", mz.sourceShards[0].ShardName()) - } - - log.Infof("getting table schemas from source master %v...", sourceMaster) - var err error - sourceSchema, err := mz.wr.GetSchema(ctx, sourceMaster, allTables, nil, false) - if err != nil { - return err - } - log.Infof("got table schemas from source master %v.", sourceMaster) - - for _, td := range sourceSchema.TableDefinitions { - sourceDDL[td.Name] = td.Schema - } + for _, td := range targetSchema.TableDefinitions { + hasTargetTable[td.Name] = true } targetTablet, err := mz.wr.ts.GetTablet(ctx, target.MasterAlias) @@ -633,7 +636,7 @@ func (mz *materializer) deploySchema(ctx context.Context) error { return err } - applyDDLs := []string{} + var applyDDLs []string for _, ts := range mz.ms.TableSettings { if hasTargetTable[ts.TargetTable] { // Table already exists. @@ -642,6 +645,21 @@ func (mz *materializer) deploySchema(ctx context.Context) error { if ts.CreateDdl == "" { return fmt.Errorf("target table %v does not exist and there is no create ddl defined", ts.TargetTable) } + + var err error + mu.Lock() + if len(sourceDDLs) == 0 { + //only get ddls for tables, once and lazily: if we need to copy the schema from source to target + //we copy schemas from masters on the source keyspace + //and we have found use cases where user just has a replica (no master) in the source keyspace + sourceDDLs, err = mz.getSourceTableDDLs(ctx) + } + mu.Unlock() + if err != nil { + log.Errorf("Error getting DDLs of source tables: %s", err.Error()) + return err + } + createDDL := ts.CreateDdl if createDDL == createDDLAsCopy || createDDL == createDDLAsCopyDropConstraint { if ts.SourceExpression != "" { @@ -656,7 +674,7 @@ func (mz *materializer) deploySchema(ctx context.Context) error { } } - ddl, ok := sourceDDL[ts.TargetTable] + ddl, ok := sourceDDLs[ts.TargetTable] if !ok { return fmt.Errorf("source table %v does not exist", ts.TargetTable) } diff --git a/go/vt/wrangler/materializer_env_test.go b/go/vt/wrangler/materializer_env_test.go index 6c8df72d50a..3142b0d0326 100644 --- a/go/vt/wrangler/materializer_env_test.go +++ b/go/vt/wrangler/materializer_env_test.go @@ -19,6 +19,7 @@ package wrangler import ( "fmt" "regexp" + "strconv" "strings" "sync" "testing" @@ -63,7 +64,6 @@ func newTestMaterializerEnv(t *testing.T, ms *vtctldatapb.MaterializeSettings, s tmc: newTestMaterializerTMClient(), } env.wr = New(logutil.NewConsoleLogger(), env.topoServ, env.tmc) - tabletID := 100 for _, shard := range sources { _ = env.addTablet(tabletID, env.ms.SourceKeyspace, shard, topodatapb.TabletType_MASTER) @@ -161,18 +161,41 @@ type testMaterializerTMClient struct { tmclient.TabletManagerClient schema map[string]*tabletmanagerdatapb.SchemaDefinition - mu sync.Mutex - vrQueries map[int][]*queryResult + mu sync.Mutex + vrQueries map[int][]*queryResult + getSchemaCounts map[string]int + muSchemaCount sync.Mutex } func newTestMaterializerTMClient() *testMaterializerTMClient { return &testMaterializerTMClient{ - schema: make(map[string]*tabletmanagerdatapb.SchemaDefinition), - vrQueries: make(map[int][]*queryResult), + schema: make(map[string]*tabletmanagerdatapb.SchemaDefinition), + vrQueries: make(map[int][]*queryResult), + getSchemaCounts: make(map[string]int), } } +func (tmc *testMaterializerTMClient) schemaRequested(uid uint32) { + tmc.muSchemaCount.Lock() + defer tmc.muSchemaCount.Unlock() + key := strconv.Itoa(int(uid)) + n, ok := tmc.getSchemaCounts[key] + if !ok { + tmc.getSchemaCounts[key] = 1 + } else { + tmc.getSchemaCounts[key] = n + 1 + } +} + +func (tmc *testMaterializerTMClient) getSchemaRequestCount(uid uint32) int { + tmc.muSchemaCount.Lock() + defer tmc.muSchemaCount.Unlock() + key := strconv.Itoa(int(uid)) + return tmc.getSchemaCounts[key] +} + func (tmc *testMaterializerTMClient) GetSchema(ctx context.Context, tablet *topodatapb.Tablet, tables, excludeTables []string, includeViews bool) (*tabletmanagerdatapb.SchemaDefinition, error) { + tmc.schemaRequested(tablet.Alias.Uid) schemaDefn := &tabletmanagerdatapb.SchemaDefinition{} for _, table := range tables { // TODO: Add generalized regexps if needed for test purposes. diff --git a/go/vt/wrangler/materializer_test.go b/go/vt/wrangler/materializer_test.go index 2b7f88f665d..7181fa95c31 100644 --- a/go/vt/wrangler/materializer_test.go +++ b/go/vt/wrangler/materializer_test.go @@ -1699,6 +1699,8 @@ func TestMaterializerDeploySchema(t *testing.T) { err := env.wr.Materialize(context.Background(), ms) assert.NoError(t, err) env.tmc.verifyQueries(t) + require.Equal(t, env.tmc.getSchemaRequestCount(100), 1) + require.Equal(t, env.tmc.getSchemaRequestCount(200), 1) } func TestMaterializerCopySchema(t *testing.T) { @@ -1734,6 +1736,9 @@ func TestMaterializerCopySchema(t *testing.T) { err := env.wr.Materialize(context.Background(), ms) assert.NoError(t, err) env.tmc.verifyQueries(t) + require.Equal(t, env.tmc.getSchemaRequestCount(100), 1) + require.Equal(t, env.tmc.getSchemaRequestCount(200), 1) + } func TestMaterializerExplicitColumns(t *testing.T) { @@ -1922,6 +1927,9 @@ func TestMaterializerNoDDL(t *testing.T) { err := env.wr.Materialize(context.Background(), ms) assert.EqualError(t, err, "target table t1 does not exist and there is no create ddl defined") + require.Equal(t, env.tmc.getSchemaRequestCount(100), 0) + require.Equal(t, env.tmc.getSchemaRequestCount(200), 1) + } func TestMaterializerNoSourceMaster(t *testing.T) {