diff --git a/go/vt/wrangler/materializer.go b/go/vt/wrangler/materializer.go index 4cab0c0a9c3..cd3ccf014af 100644 --- a/go/vt/wrangler/materializer.go +++ b/go/vt/wrangler/materializer.go @@ -22,6 +22,7 @@ import ( "sync" "text/template" + "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/vtgate/evalengine" "github.com/gogo/protobuf/proto" @@ -50,6 +51,10 @@ type materializer struct { targetShards []*topo.ShardInfo } +const ( + createDDLAsCopy = "copy" +) + // MoveTables initiates moving table(s) over to another keyspace func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, targetKeyspace, tableSpecs, cell, tabletTypes string) error { var tables []string @@ -112,7 +117,7 @@ func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, ta ms.TableSettings = append(ms.TableSettings, &vtctldatapb.TableMaterializeSettings{ TargetTable: table, SourceExpression: buf.String(), - CreateDdl: "copy", + CreateDdl: createDDLAsCopy, }) } return wr.Materialize(ctx, ms) @@ -583,46 +588,77 @@ func (wr *Wrangler) buildMaterializer(ctx context.Context, ms *vtctldatapb.Mater } func (mz *materializer) deploySchema(ctx context.Context) error { + return mz.forAllTargets(func(target *topo.ShardInfo) error { - for _, ts := range mz.ms.TableSettings { - tableSchema, err := mz.wr.GetSchema(ctx, target.MasterAlias, []string{ts.TargetTable}, nil, false) + allTables := []string{"/.*/"} + + hasTargetTable := map[string]bool{} + { + targetSchema, err := mz.wr.GetSchema(ctx, target.MasterAlias, allTables, nil, false) if err != nil { return err } - if len(tableSchema.TableDefinitions) != 0 { + + for _, td := range targetSchema.TableDefinitions { + hasTargetTable[td.Name] = true + } + } + + sourceDDL := map[string]string{} + { + sourceMaster := mz.sourceShards[0].MasterAlias + if sourceMaster == nil { + return fmt.Errorf("source shard must have a master for copying schema: %v", mz.sourceShards[0].ShardName()) + } + + log.Infof("getting table schemas from source master...") + var err error + sourceSchema, err := mz.wr.GetSchema(ctx, sourceMaster, allTables, nil, false) + if err != nil { + return err + } + + for _, td := range sourceSchema.TableDefinitions { + sourceDDL[td.Name] = td.Schema + } + } + + for _, ts := range mz.ms.TableSettings { + if hasTargetTable[ts.TargetTable] { // Table already exists. continue } if ts.CreateDdl == "" { return fmt.Errorf("target table %v does not exist and there is no create ddl defined", ts.TargetTable) } - createddl := ts.CreateDdl - if createddl == "copy" { - sourceTableName, err := sqlparser.TableFromStatement(ts.SourceExpression) - if err != nil { - return err - } - if sourceTableName.Name.String() != ts.TargetTable { - return fmt.Errorf("source and target table names must match for copying schema: %v vs %v", sqlparser.String(sourceTableName), ts.TargetTable) - } - sourceMaster := mz.sourceShards[0].MasterAlias - if sourceMaster == nil { - return fmt.Errorf("source shard must have a master for copying schema: %v", mz.sourceShards[0].ShardName()) - } - sourceSchema, err := mz.wr.GetSchema(ctx, sourceMaster, []string{ts.TargetTable}, nil, false) - if err != nil { - return err + createDDL := ts.CreateDdl + if createDDL == createDDLAsCopy { + if ts.SourceExpression != "" { + // Check for table if non-empty SourceExpression. + sourceTableName, err := sqlparser.TableFromStatement(ts.SourceExpression) + if err != nil { + return err + } + if sourceTableName.Name.String() != ts.TargetTable { + return fmt.Errorf("source and target table names must match for copying schema: %v vs %v", sqlparser.String(sourceTableName), ts.TargetTable) + + } + } - if len(sourceSchema.TableDefinitions) == 0 { + + ddl, ok := sourceDDL[ts.TargetTable] + if !ok { return fmt.Errorf("source table %v does not exist", ts.TargetTable) } - createddl = sourceSchema.TableDefinitions[0].Schema + createDDL = ddl } + targetTablet, err := mz.wr.ts.GetTablet(ctx, target.MasterAlias) if err != nil { return err } - if _, err := mz.wr.tmc.ExecuteFetchAsDba(ctx, targetTablet.Tablet, false, []byte(createddl), 0, false, true); err != nil { + + if _, err := mz.wr.tmc.ExecuteFetchAsDba(ctx, targetTablet.Tablet, false, []byte(createDDL), 0, false, true); err != nil { return err } } @@ -644,7 +680,13 @@ func (mz *materializer) generateInserts(ctx context.Context) (string, error) { rule := &binlogdatapb.Rule{ Match: ts.TargetTable, } - // Validate the query. + + if ts.SourceExpression == "" { + bls.Filter.Rules = append(bls.Filter.Rules, rule) + continue + } + + // Validate non-empty query. stmt, err := sqlparser.Parse(ts.SourceExpression) if err != nil { return "", err @@ -653,6 +695,8 @@ func (mz *materializer) generateInserts(ctx context.Context) (string, error) { if !ok { return "", fmt.Errorf("unrecognized statement: %s", ts.SourceExpression) } + + filter := ts.SourceExpression if mz.targetVSchema.Keyspace.Sharded && mz.targetVSchema.Tables[ts.TargetTable].Type != vindexes.TypeReference { cv, err := vindexes.FindBestColVindex(mz.targetVSchema.Tables[ts.TargetTable]) if err != nil { @@ -680,10 +724,12 @@ func (mz *materializer) generateInserts(ctx context.Context) (string, error) { Exprs: subExprs, }, } - rule.Filter = sqlparser.String(sel) - } else { - rule.Filter = ts.SourceExpression + + filter = sqlparser.String(sel) } + + rule.Filter = filter + bls.Filter.Rules = append(bls.Filter.Rules, rule) } ig.AddRow(mz.ms.Workflow, bls, "", mz.ms.Cell, mz.ms.TabletTypes) diff --git a/go/vt/wrangler/materializer_env_test.go b/go/vt/wrangler/materializer_env_test.go index ab88b81e3bd..949a1f4dcdd 100644 --- a/go/vt/wrangler/materializer_env_test.go +++ b/go/vt/wrangler/materializer_env_test.go @@ -19,6 +19,7 @@ package wrangler import ( "fmt" "regexp" + "strings" "sync" "testing" @@ -76,18 +77,20 @@ func newTestMaterializerEnv(t *testing.T, ms *vtctldatapb.MaterializeSettings, s } for _, ts := range ms.TableSettings { - tablename := ts.TargetTable + tableName := ts.TargetTable table, err := sqlparser.TableFromStatement(ts.SourceExpression) if err == nil { - tablename = table.Name.String() + tableName = table.Name.String() } - env.tmc.schema[ms.SourceKeyspace+"."+tablename] = &tabletmanagerdatapb.SchemaDefinition{ + env.tmc.schema[ms.SourceKeyspace+"."+tableName] = &tabletmanagerdatapb.SchemaDefinition{ TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{ - Schema: fmt.Sprintf("%s_schema", tablename), + Name: tableName, + Schema: fmt.Sprintf("%s_schema", tableName), }}, } env.tmc.schema[ms.TargetKeyspace+"."+ts.TargetTable] = &tabletmanagerdatapb.SchemaDefinition{ TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{ + Name: ts.TargetTable, Schema: fmt.Sprintf("%s_schema", ts.TargetTable), }}, } @@ -169,11 +172,27 @@ func newTestMaterializerTMClient() *testMaterializerTMClient { } func (tmc *testMaterializerTMClient) GetSchema(ctx context.Context, tablet *topodatapb.Tablet, tables, excludeTables []string, includeViews bool) (*tabletmanagerdatapb.SchemaDefinition, error) { - key := tablet.Keyspace + "." + tables[0] - if tmc.schema[key] == nil { - return &tabletmanagerdatapb.SchemaDefinition{}, nil + schemaDefn := &tabletmanagerdatapb.SchemaDefinition{} + for _, table := range tables { + // TODO: Add generalized regexps if needed for test purposes. + if table == "/.*/" { + // Special case of all tables in keyspace. + for key, tableDefn := range tmc.schema { + if strings.HasPrefix(key, tablet.Keyspace+".") { + schemaDefn.TableDefinitions = append(schemaDefn.TableDefinitions, tableDefn.TableDefinitions...) + } + } + break + } + + key := tablet.Keyspace + "." + table + tableDefn := tmc.schema[key] + if tableDefn == nil { + continue + } + schemaDefn.TableDefinitions = append(schemaDefn.TableDefinitions, tableDefn.TableDefinitions...) } - return tmc.schema[key], nil + return schemaDefn, nil } func (tmc *testMaterializerTMClient) expectVRQuery(tabletID int, query string, result *sqltypes.Result) { diff --git a/go/vt/wrangler/materializer_test.go b/go/vt/wrangler/materializer_test.go index 85370056e31..6e8b5496029 100644 --- a/go/vt/wrangler/materializer_test.go +++ b/go/vt/wrangler/materializer_test.go @@ -1418,15 +1418,23 @@ func TestMaterializerOneToOne(t *testing.T) { Workflow: "workflow", SourceKeyspace: "sourceks", TargetKeyspace: "targetks", - TableSettings: []*vtctldatapb.TableMaterializeSettings{{ - TargetTable: "t1", - SourceExpression: "select * from t1", - CreateDdl: "t1ddl", - }, { - TargetTable: "t2", - SourceExpression: "select * from t3", - CreateDdl: "t2ddl", - }}, + TableSettings: []*vtctldatapb.TableMaterializeSettings{ + { + TargetTable: "t1", + SourceExpression: "select * from t1", + CreateDdl: "t1ddl", + }, + { + TargetTable: "t2", + SourceExpression: "select * from t3", + CreateDdl: "t2ddl", + }, + { + TargetTable: "t4", + SourceExpression: "", // empty + CreateDdl: "t4ddl", + }, + }, } env := newTestMaterializerEnv(t, ms, []string{"0"}, []string{"0"}) defer env.close() @@ -1434,8 +1442,16 @@ func TestMaterializerOneToOne(t *testing.T) { env.tmc.expectVRQuery( 200, insertPrefix+ - `\('workflow', 'keyspace:\\"sourceks\\" shard:\\"0\\" filter: rules: > ', '', [0-9]*, [0-9]*, '', '', [0-9]*, 0, 'Stopped', 'vt_targetks'\)`+ - eol, + `\(`+ + `'workflow', `+ + (`'keyspace:\\"sourceks\\" shard:\\"0\\" `+ + `filter:<`+ + `rules: `+ + `rules: `+ + `rules: `+ + `> ', `)+ + `'', [0-9]*, [0-9]*, '', '', [0-9]*, 0, 'Stopped', 'vt_targetks'`+ + `\)`+eol, &sqltypes.Result{}, ) env.tmc.expectVRQuery(200, mzUpdateQuery, &sqltypes.Result{}) @@ -1953,7 +1969,27 @@ func TestMaterializerNoSourceMaster(t *testing.T) { assert.EqualError(t, err, "source shard must have a master for copying schema: 0") } -func TestMaterializerTableMismatch(t *testing.T) { +func TestMaterializerTableMismatchNonCopy(t *testing.T) { + ms := &vtctldatapb.MaterializeSettings{ + Workflow: "workflow", + SourceKeyspace: "sourceks", + TargetKeyspace: "targetks", + TableSettings: []*vtctldatapb.TableMaterializeSettings{{ + TargetTable: "t1", + SourceExpression: "select * from t2", + CreateDdl: "", + }}, + } + env := newTestMaterializerEnv(t, ms, []string{"0"}, []string{"0"}) + defer env.close() + + delete(env.tmc.schema, "targetks.t1") + + err := env.wr.Materialize(context.Background(), ms) + assert.EqualError(t, err, "target table t1 does not exist and there is no create ddl defined") +} + +func TestMaterializerTableMismatchCopy(t *testing.T) { ms := &vtctldatapb.MaterializeSettings{ Workflow: "workflow", SourceKeyspace: "sourceks",