From 368ccd3cc246b2e2936ef95b038eba5d3249fa87 Mon Sep 17 00:00:00 2001 From: Toliver Jue Date: Thu, 21 May 2020 20:17:24 +0900 Subject: [PATCH 1/5] Vtctl Materialize optimizations Signed-off-by: Toliver Jue --- go/vt/wrangler/materializer.go | 181 ++++++++++++++++-------- go/vt/wrangler/materializer_env_test.go | 23 +-- go/vt/wrangler/materializer_test.go | 4 +- 3 files changed, 142 insertions(+), 66 deletions(-) diff --git a/go/vt/wrangler/materializer.go b/go/vt/wrangler/materializer.go index 4cab0c0a9c3..dc798d28be4 100644 --- a/go/vt/wrangler/materializer.go +++ b/go/vt/wrangler/materializer.go @@ -22,6 +22,7 @@ import ( "sync" "text/template" + "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/vtgate/evalengine" "github.com/gogo/protobuf/proto" @@ -50,6 +51,10 @@ type materializer struct { targetShards []*topo.ShardInfo } +const ( + createDDLAsCopy = "copy" +) + // MoveTables initiates moving table(s) over to another keyspace func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, targetKeyspace, tableSpecs, cell, tabletTypes string) error { var tables []string @@ -112,7 +117,7 @@ func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, ta ms.TableSettings = append(ms.TableSettings, &vtctldatapb.TableMaterializeSettings{ TargetTable: table, SourceExpression: buf.String(), - CreateDdl: "copy", + CreateDdl: createDDLAsCopy, }) } return wr.Materialize(ctx, ms) @@ -583,46 +588,107 @@ func (wr *Wrangler) buildMaterializer(ctx context.Context, ms *vtctldatapb.Mater } func (mz *materializer) deploySchema(ctx context.Context) error { + return mz.forAllTargets(func(target *topo.ShardInfo) error { + needsCopy := false + copyTables := map[string]bool{} + targetTables := map[string]bool{} + for _, ts := range mz.ms.TableSettings { - tableSchema, err := mz.wr.GetSchema(ctx, target.MasterAlias, []string{ts.TargetTable}, nil, false) + targetTables[ts.TargetTable] = true + + if ts.CreateDdl == createDDLAsCopy { + needsCopy = true + copyTables[ts.TargetTable] = true + } + } + + // Check for all desired tables in target. + hasTargetTable := map[string]bool{} + { + tableList := make([]string, 0, len(targetTables)) + for table := range targetTables { + tableList = append(tableList, table) + } + + targetSchema, err := mz.wr.GetSchema(ctx, target.MasterAlias, tableList, nil, false) + if err != nil { + return err + } + + for _, td := range targetSchema.TableDefinitions { + hasTargetTable[td.Name] = true + } + } + + // Check for all source table schemas that need copying. + tableDDL := map[string]string{} + if needsCopy { + sourceMaster := mz.sourceShards[0].MasterAlias + if sourceMaster == nil { + return fmt.Errorf("source shard must have a master for copying schema: %v", mz.sourceShards[0].ShardName()) + } + + tableList := make([]string, 0, len(copyTables)) + for table := range copyTables { + tableList = append(tableList, table) + } + + log.Infof("copy: getting table schemas from source master: %+v...", tableList) + var err error + sourceSchema, err := mz.wr.GetSchema(ctx, sourceMaster, tableList, nil, false) if err != nil { return err } - if len(tableSchema.TableDefinitions) != 0 { + + // Check for any missing tables in schema after accounting for returned TableDefinitions. + missingTables := copyTables + for _, td := range sourceSchema.TableDefinitions { + tableDDL[td.Name] = td.Schema + + delete(missingTables, td.Name) + } + + if len(missingTables) > 0 { + var tableList []string + for table := range missingTables { + tableList = append(tableList, table) + } + + return fmt.Errorf("copy: source tables do not exist: %+v.", tableList) + } + } + + for _, ts := range mz.ms.TableSettings { + if hasTargetTable[ts.TargetTable] { // Table already exists. continue } if ts.CreateDdl == "" { return fmt.Errorf("target table %v does not exist and there is no create ddl defined", ts.TargetTable) } - createddl := ts.CreateDdl - if createddl == "copy" { - sourceTableName, err := sqlparser.TableFromStatement(ts.SourceExpression) - if err != nil { - return err - } - if sourceTableName.Name.String() != ts.TargetTable { - return fmt.Errorf("source and target table names must match for copying schema: %v vs %v", sqlparser.String(sourceTableName), ts.TargetTable) - } - sourceMaster := mz.sourceShards[0].MasterAlias - if sourceMaster == nil { - return fmt.Errorf("source shard must have a master for copying schema: %v", mz.sourceShards[0].ShardName()) - } - sourceSchema, err := mz.wr.GetSchema(ctx, sourceMaster, []string{ts.TargetTable}, nil, false) - if err != nil { - return err - } - if len(sourceSchema.TableDefinitions) == 0 { - return fmt.Errorf("source table %v does not exist", ts.TargetTable) + createDDL := ts.CreateDdl + if createDDL == createDDLAsCopy { + if ts.SourceExpression != "" { + // Check for table if non-empty SourceExpression. + sourceTableName, err := sqlparser.TableFromStatement(ts.SourceExpression) + if err != nil { + return err + } + if sourceTableName.Name.String() != ts.TargetTable { + return fmt.Errorf("source and target table names must match for copying schema: %v vs %v", sqlparser.String(sourceTableName), ts.TargetTable) + } } - createddl = sourceSchema.TableDefinitions[0].Schema + + createDDL = tableDDL[ts.TargetTable] } + targetTablet, err := mz.wr.ts.GetTablet(ctx, target.MasterAlias) if err != nil { return err } - if _, err := mz.wr.tmc.ExecuteFetchAsDba(ctx, targetTablet.Tablet, false, []byte(createddl), 0, false, true); err != nil { + + if _, err := mz.wr.tmc.ExecuteFetchAsDba(ctx, targetTablet.Tablet, false, []byte(createDDL), 0, false, true); err != nil { return err } } @@ -644,45 +710,48 @@ func (mz *materializer) generateInserts(ctx context.Context) (string, error) { rule := &binlogdatapb.Rule{ Match: ts.TargetTable, } - // Validate the query. - stmt, err := sqlparser.Parse(ts.SourceExpression) - if err != nil { - return "", err - } - sel, ok := stmt.(*sqlparser.Select) - if !ok { - return "", fmt.Errorf("unrecognized statement: %s", ts.SourceExpression) - } - if mz.targetVSchema.Keyspace.Sharded && mz.targetVSchema.Tables[ts.TargetTable].Type != vindexes.TypeReference { - cv, err := vindexes.FindBestColVindex(mz.targetVSchema.Tables[ts.TargetTable]) + + if ts.SourceExpression != "" { + // Validate non-empty query. + stmt, err := sqlparser.Parse(ts.SourceExpression) if err != nil { return "", err } - mappedCols := make([]*sqlparser.ColName, 0, len(cv.Columns)) - for _, col := range cv.Columns { - colName, err := matchColInSelect(col, sel) + sel, ok := stmt.(*sqlparser.Select) + if !ok { + return "", fmt.Errorf("unrecognized statement: %s", ts.SourceExpression) + } + if mz.targetVSchema.Keyspace.Sharded && mz.targetVSchema.Tables[ts.TargetTable].Type != vindexes.TypeReference { + cv, err := vindexes.FindBestColVindex(mz.targetVSchema.Tables[ts.TargetTable]) if err != nil { return "", err } - mappedCols = append(mappedCols, colName) - } - subExprs := make(sqlparser.SelectExprs, 0, len(mappedCols)+2) - for _, mappedCol := range mappedCols { - subExprs = append(subExprs, &sqlparser.AliasedExpr{Expr: mappedCol}) - } - vindexName := fmt.Sprintf("%s.%s", mz.ms.TargetKeyspace, cv.Name) - subExprs = append(subExprs, &sqlparser.AliasedExpr{Expr: sqlparser.NewStrVal([]byte(vindexName))}) - subExprs = append(subExprs, &sqlparser.AliasedExpr{Expr: sqlparser.NewStrVal([]byte("{{.keyrange}}"))}) - sel.Where = &sqlparser.Where{ - Type: sqlparser.WhereStr, - Expr: &sqlparser.FuncExpr{ - Name: sqlparser.NewColIdent("in_keyrange"), - Exprs: subExprs, - }, + mappedCols := make([]*sqlparser.ColName, 0, len(cv.Columns)) + for _, col := range cv.Columns { + colName, err := matchColInSelect(col, sel) + if err != nil { + return "", err + } + mappedCols = append(mappedCols, colName) + } + subExprs := make(sqlparser.SelectExprs, 0, len(mappedCols)+2) + for _, mappedCol := range mappedCols { + subExprs = append(subExprs, &sqlparser.AliasedExpr{Expr: mappedCol}) + } + vindexName := fmt.Sprintf("%s.%s", mz.ms.TargetKeyspace, cv.Name) + subExprs = append(subExprs, &sqlparser.AliasedExpr{Expr: sqlparser.NewStrVal([]byte(vindexName))}) + subExprs = append(subExprs, &sqlparser.AliasedExpr{Expr: sqlparser.NewStrVal([]byte("{{.keyrange}}"))}) + sel.Where = &sqlparser.Where{ + Type: sqlparser.WhereStr, + Expr: &sqlparser.FuncExpr{ + Name: sqlparser.NewColIdent("in_keyrange"), + Exprs: subExprs, + }, + } + rule.Filter = sqlparser.String(sel) + } else { + rule.Filter = ts.SourceExpression } - rule.Filter = sqlparser.String(sel) - } else { - rule.Filter = ts.SourceExpression } bls.Filter.Rules = append(bls.Filter.Rules, rule) } diff --git a/go/vt/wrangler/materializer_env_test.go b/go/vt/wrangler/materializer_env_test.go index ab88b81e3bd..ecace4ac37e 100644 --- a/go/vt/wrangler/materializer_env_test.go +++ b/go/vt/wrangler/materializer_env_test.go @@ -76,18 +76,20 @@ func newTestMaterializerEnv(t *testing.T, ms *vtctldatapb.MaterializeSettings, s } for _, ts := range ms.TableSettings { - tablename := ts.TargetTable + tableName := ts.TargetTable table, err := sqlparser.TableFromStatement(ts.SourceExpression) if err == nil { - tablename = table.Name.String() + tableName = table.Name.String() } - env.tmc.schema[ms.SourceKeyspace+"."+tablename] = &tabletmanagerdatapb.SchemaDefinition{ + env.tmc.schema[ms.SourceKeyspace+"."+tableName] = &tabletmanagerdatapb.SchemaDefinition{ TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{ - Schema: fmt.Sprintf("%s_schema", tablename), + Name: tableName, + Schema: fmt.Sprintf("%s_schema", tableName), }}, } env.tmc.schema[ms.TargetKeyspace+"."+ts.TargetTable] = &tabletmanagerdatapb.SchemaDefinition{ TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{ + Name: ts.TargetTable, Schema: fmt.Sprintf("%s_schema", ts.TargetTable), }}, } @@ -169,11 +171,16 @@ func newTestMaterializerTMClient() *testMaterializerTMClient { } func (tmc *testMaterializerTMClient) GetSchema(ctx context.Context, tablet *topodatapb.Tablet, tables, excludeTables []string, includeViews bool) (*tabletmanagerdatapb.SchemaDefinition, error) { - key := tablet.Keyspace + "." + tables[0] - if tmc.schema[key] == nil { - return &tabletmanagerdatapb.SchemaDefinition{}, nil + schemaDefn := &tabletmanagerdatapb.SchemaDefinition{} + for _, table := range tables { + key := tablet.Keyspace + "." + table + tableDefn := tmc.schema[key] + if tableDefn == nil { + continue + } + schemaDefn.TableDefinitions = append(schemaDefn.TableDefinitions, tableDefn.TableDefinitions...) } - return tmc.schema[key], nil + return schemaDefn, nil } func (tmc *testMaterializerTMClient) expectVRQuery(tabletID int, query string, result *sqltypes.Result) { diff --git a/go/vt/wrangler/materializer_test.go b/go/vt/wrangler/materializer_test.go index 85370056e31..b23bd8ca95d 100644 --- a/go/vt/wrangler/materializer_test.go +++ b/go/vt/wrangler/materializer_test.go @@ -1970,7 +1970,7 @@ func TestMaterializerTableMismatch(t *testing.T) { delete(env.tmc.schema, "targetks.t1") err := env.wr.Materialize(context.Background(), ms) - assert.EqualError(t, err, "source and target table names must match for copying schema: t2 vs t1") + assert.EqualError(t, err, "copy: source tables do not exist: [t1].") } func TestMaterializerNoSourceTable(t *testing.T) { @@ -1991,7 +1991,7 @@ func TestMaterializerNoSourceTable(t *testing.T) { delete(env.tmc.schema, "sourceks.t1") err := env.wr.Materialize(context.Background(), ms) - assert.EqualError(t, err, "source table t1 does not exist") + assert.EqualError(t, err, "copy: source tables do not exist: [t1].") } func TestMaterializerSyntaxError(t *testing.T) { From 5f4f7f7b86cf5dbf31f497238228f7cc7f97dd5a Mon Sep 17 00:00:00 2001 From: Toliver Jue Date: Fri, 22 May 2020 18:06:12 +0900 Subject: [PATCH 2/5] split materializer test for copy/non-copy Signed-off-by: Toliver Jue --- go/vt/wrangler/materializer_test.go | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/go/vt/wrangler/materializer_test.go b/go/vt/wrangler/materializer_test.go index b23bd8ca95d..741978d1133 100644 --- a/go/vt/wrangler/materializer_test.go +++ b/go/vt/wrangler/materializer_test.go @@ -1953,7 +1953,27 @@ func TestMaterializerNoSourceMaster(t *testing.T) { assert.EqualError(t, err, "source shard must have a master for copying schema: 0") } -func TestMaterializerTableMismatch(t *testing.T) { +func TestMaterializerTableMismatchNonCopy(t *testing.T) { + ms := &vtctldatapb.MaterializeSettings{ + Workflow: "workflow", + SourceKeyspace: "sourceks", + TargetKeyspace: "targetks", + TableSettings: []*vtctldatapb.TableMaterializeSettings{{ + TargetTable: "t1", + SourceExpression: "select * from t2", + CreateDdl: "", + }}, + } + env := newTestMaterializerEnv(t, ms, []string{"0"}, []string{"0"}) + defer env.close() + + delete(env.tmc.schema, "targetks.t1") + + err := env.wr.Materialize(context.Background(), ms) + assert.EqualError(t, err, "target table t1 does not exist and there is no create ddl defined") +} + +func TestMaterializerTableMismatchCopy(t *testing.T) { ms := &vtctldatapb.MaterializeSettings{ Workflow: "workflow", SourceKeyspace: "sourceks", From 9171b55ab2f9047f6d5b9a8c301d1f210a4de1c1 Mon Sep 17 00:00:00 2001 From: Toliver Jue Date: Sun, 24 May 2020 14:46:22 +0900 Subject: [PATCH 3/5] always grab full schema from source and target Signed-off-by: Toliver Jue --- go/vt/wrangler/materializer.go | 58 ++++++------------------- go/vt/wrangler/materializer_env_test.go | 12 +++++ go/vt/wrangler/materializer_test.go | 4 +- 3 files changed, 28 insertions(+), 46 deletions(-) diff --git a/go/vt/wrangler/materializer.go b/go/vt/wrangler/materializer.go index dc798d28be4..75d335ce015 100644 --- a/go/vt/wrangler/materializer.go +++ b/go/vt/wrangler/materializer.go @@ -590,28 +590,11 @@ func (wr *Wrangler) buildMaterializer(ctx context.Context, ms *vtctldatapb.Mater func (mz *materializer) deploySchema(ctx context.Context) error { return mz.forAllTargets(func(target *topo.ShardInfo) error { - needsCopy := false - copyTables := map[string]bool{} - targetTables := map[string]bool{} + allTables := []string{"/.*/"} - for _, ts := range mz.ms.TableSettings { - targetTables[ts.TargetTable] = true - - if ts.CreateDdl == createDDLAsCopy { - needsCopy = true - copyTables[ts.TargetTable] = true - } - } - - // Check for all desired tables in target. hasTargetTable := map[string]bool{} { - tableList := make([]string, 0, len(targetTables)) - for table := range targetTables { - tableList = append(tableList, table) - } - - targetSchema, err := mz.wr.GetSchema(ctx, target.MasterAlias, tableList, nil, false) + targetSchema, err := mz.wr.GetSchema(ctx, target.MasterAlias, allTables, nil, false) if err != nil { return err } @@ -621,41 +604,22 @@ func (mz *materializer) deploySchema(ctx context.Context) error { } } - // Check for all source table schemas that need copying. - tableDDL := map[string]string{} - if needsCopy { + sourceDDL := map[string]string{} + { sourceMaster := mz.sourceShards[0].MasterAlias if sourceMaster == nil { return fmt.Errorf("source shard must have a master for copying schema: %v", mz.sourceShards[0].ShardName()) } - tableList := make([]string, 0, len(copyTables)) - for table := range copyTables { - tableList = append(tableList, table) - } - - log.Infof("copy: getting table schemas from source master: %+v...", tableList) + log.Infof("getting table schemas from source master...") var err error - sourceSchema, err := mz.wr.GetSchema(ctx, sourceMaster, tableList, nil, false) + sourceSchema, err := mz.wr.GetSchema(ctx, sourceMaster, allTables, nil, false) if err != nil { return err } - // Check for any missing tables in schema after accounting for returned TableDefinitions. - missingTables := copyTables for _, td := range sourceSchema.TableDefinitions { - tableDDL[td.Name] = td.Schema - - delete(missingTables, td.Name) - } - - if len(missingTables) > 0 { - var tableList []string - for table := range missingTables { - tableList = append(tableList, table) - } - - return fmt.Errorf("copy: source tables do not exist: %+v.", tableList) + sourceDDL[td.Name] = td.Schema } } @@ -677,10 +641,16 @@ func (mz *materializer) deploySchema(ctx context.Context) error { } if sourceTableName.Name.String() != ts.TargetTable { return fmt.Errorf("source and target table names must match for copying schema: %v vs %v", sqlparser.String(sourceTableName), ts.TargetTable) + } + } - createDDL = tableDDL[ts.TargetTable] + ddl, ok := sourceDDL[ts.TargetTable] + if !ok { + return fmt.Errorf("source table %v does not exist", ts.TargetTable) + } + createDDL = ddl } targetTablet, err := mz.wr.ts.GetTablet(ctx, target.MasterAlias) diff --git a/go/vt/wrangler/materializer_env_test.go b/go/vt/wrangler/materializer_env_test.go index ecace4ac37e..949a1f4dcdd 100644 --- a/go/vt/wrangler/materializer_env_test.go +++ b/go/vt/wrangler/materializer_env_test.go @@ -19,6 +19,7 @@ package wrangler import ( "fmt" "regexp" + "strings" "sync" "testing" @@ -173,6 +174,17 @@ func newTestMaterializerTMClient() *testMaterializerTMClient { func (tmc *testMaterializerTMClient) GetSchema(ctx context.Context, tablet *topodatapb.Tablet, tables, excludeTables []string, includeViews bool) (*tabletmanagerdatapb.SchemaDefinition, error) { schemaDefn := &tabletmanagerdatapb.SchemaDefinition{} for _, table := range tables { + // TODO: Add generalized regexps if needed for test purposes. + if table == "/.*/" { + // Special case of all tables in keyspace. + for key, tableDefn := range tmc.schema { + if strings.HasPrefix(key, tablet.Keyspace+".") { + schemaDefn.TableDefinitions = append(schemaDefn.TableDefinitions, tableDefn.TableDefinitions...) + } + } + break + } + key := tablet.Keyspace + "." + table tableDefn := tmc.schema[key] if tableDefn == nil { diff --git a/go/vt/wrangler/materializer_test.go b/go/vt/wrangler/materializer_test.go index 741978d1133..a2eea6334d7 100644 --- a/go/vt/wrangler/materializer_test.go +++ b/go/vt/wrangler/materializer_test.go @@ -1990,7 +1990,7 @@ func TestMaterializerTableMismatchCopy(t *testing.T) { delete(env.tmc.schema, "targetks.t1") err := env.wr.Materialize(context.Background(), ms) - assert.EqualError(t, err, "copy: source tables do not exist: [t1].") + assert.EqualError(t, err, "source and target table names must match for copying schema: t2 vs t1") } func TestMaterializerNoSourceTable(t *testing.T) { @@ -2011,7 +2011,7 @@ func TestMaterializerNoSourceTable(t *testing.T) { delete(env.tmc.schema, "sourceks.t1") err := env.wr.Materialize(context.Background(), ms) - assert.EqualError(t, err, "copy: source tables do not exist: [t1].") + assert.EqualError(t, err, "source table t1 does not exist") } func TestMaterializerSyntaxError(t *testing.T) { From cc40ce72c4e71d46368f6b6f4f7416befaa1b7f5 Mon Sep 17 00:00:00 2001 From: Toliver Jue Date: Mon, 25 May 2020 03:17:23 +0900 Subject: [PATCH 4/5] unindent, remove else Signed-off-by: Toliver Jue --- go/vt/wrangler/materializer.go | 75 +++++++++++++++++++--------------- 1 file changed, 41 insertions(+), 34 deletions(-) diff --git a/go/vt/wrangler/materializer.go b/go/vt/wrangler/materializer.go index 75d335ce015..cd3ccf014af 100644 --- a/go/vt/wrangler/materializer.go +++ b/go/vt/wrangler/materializer.go @@ -681,48 +681,55 @@ func (mz *materializer) generateInserts(ctx context.Context) (string, error) { Match: ts.TargetTable, } - if ts.SourceExpression != "" { - // Validate non-empty query. - stmt, err := sqlparser.Parse(ts.SourceExpression) + if ts.SourceExpression == "" { + bls.Filter.Rules = append(bls.Filter.Rules, rule) + continue + } + + // Validate non-empty query. + stmt, err := sqlparser.Parse(ts.SourceExpression) + if err != nil { + return "", err + } + sel, ok := stmt.(*sqlparser.Select) + if !ok { + return "", fmt.Errorf("unrecognized statement: %s", ts.SourceExpression) + } + + filter := ts.SourceExpression + if mz.targetVSchema.Keyspace.Sharded && mz.targetVSchema.Tables[ts.TargetTable].Type != vindexes.TypeReference { + cv, err := vindexes.FindBestColVindex(mz.targetVSchema.Tables[ts.TargetTable]) if err != nil { return "", err } - sel, ok := stmt.(*sqlparser.Select) - if !ok { - return "", fmt.Errorf("unrecognized statement: %s", ts.SourceExpression) - } - if mz.targetVSchema.Keyspace.Sharded && mz.targetVSchema.Tables[ts.TargetTable].Type != vindexes.TypeReference { - cv, err := vindexes.FindBestColVindex(mz.targetVSchema.Tables[ts.TargetTable]) + mappedCols := make([]*sqlparser.ColName, 0, len(cv.Columns)) + for _, col := range cv.Columns { + colName, err := matchColInSelect(col, sel) if err != nil { return "", err } - mappedCols := make([]*sqlparser.ColName, 0, len(cv.Columns)) - for _, col := range cv.Columns { - colName, err := matchColInSelect(col, sel) - if err != nil { - return "", err - } - mappedCols = append(mappedCols, colName) - } - subExprs := make(sqlparser.SelectExprs, 0, len(mappedCols)+2) - for _, mappedCol := range mappedCols { - subExprs = append(subExprs, &sqlparser.AliasedExpr{Expr: mappedCol}) - } - vindexName := fmt.Sprintf("%s.%s", mz.ms.TargetKeyspace, cv.Name) - subExprs = append(subExprs, &sqlparser.AliasedExpr{Expr: sqlparser.NewStrVal([]byte(vindexName))}) - subExprs = append(subExprs, &sqlparser.AliasedExpr{Expr: sqlparser.NewStrVal([]byte("{{.keyrange}}"))}) - sel.Where = &sqlparser.Where{ - Type: sqlparser.WhereStr, - Expr: &sqlparser.FuncExpr{ - Name: sqlparser.NewColIdent("in_keyrange"), - Exprs: subExprs, - }, - } - rule.Filter = sqlparser.String(sel) - } else { - rule.Filter = ts.SourceExpression + mappedCols = append(mappedCols, colName) + } + subExprs := make(sqlparser.SelectExprs, 0, len(mappedCols)+2) + for _, mappedCol := range mappedCols { + subExprs = append(subExprs, &sqlparser.AliasedExpr{Expr: mappedCol}) } + vindexName := fmt.Sprintf("%s.%s", mz.ms.TargetKeyspace, cv.Name) + subExprs = append(subExprs, &sqlparser.AliasedExpr{Expr: sqlparser.NewStrVal([]byte(vindexName))}) + subExprs = append(subExprs, &sqlparser.AliasedExpr{Expr: sqlparser.NewStrVal([]byte("{{.keyrange}}"))}) + sel.Where = &sqlparser.Where{ + Type: sqlparser.WhereStr, + Expr: &sqlparser.FuncExpr{ + Name: sqlparser.NewColIdent("in_keyrange"), + Exprs: subExprs, + }, + } + + filter = sqlparser.String(sel) } + + rule.Filter = filter + bls.Filter.Rules = append(bls.Filter.Rules, rule) } ig.AddRow(mz.ms.Workflow, bls, "", mz.ms.Cell, mz.ms.TabletTypes) From ccdcb632088baab9c508f14e1a28e7cc497a9116 Mon Sep 17 00:00:00 2001 From: Toliver Jue Date: Mon, 25 May 2020 03:53:19 +0900 Subject: [PATCH 5/5] empty source expression test Signed-off-by: Toliver Jue --- go/vt/wrangler/materializer_test.go | 38 ++++++++++++++++++++--------- 1 file changed, 27 insertions(+), 11 deletions(-) diff --git a/go/vt/wrangler/materializer_test.go b/go/vt/wrangler/materializer_test.go index a2eea6334d7..6e8b5496029 100644 --- a/go/vt/wrangler/materializer_test.go +++ b/go/vt/wrangler/materializer_test.go @@ -1418,15 +1418,23 @@ func TestMaterializerOneToOne(t *testing.T) { Workflow: "workflow", SourceKeyspace: "sourceks", TargetKeyspace: "targetks", - TableSettings: []*vtctldatapb.TableMaterializeSettings{{ - TargetTable: "t1", - SourceExpression: "select * from t1", - CreateDdl: "t1ddl", - }, { - TargetTable: "t2", - SourceExpression: "select * from t3", - CreateDdl: "t2ddl", - }}, + TableSettings: []*vtctldatapb.TableMaterializeSettings{ + { + TargetTable: "t1", + SourceExpression: "select * from t1", + CreateDdl: "t1ddl", + }, + { + TargetTable: "t2", + SourceExpression: "select * from t3", + CreateDdl: "t2ddl", + }, + { + TargetTable: "t4", + SourceExpression: "", // empty + CreateDdl: "t4ddl", + }, + }, } env := newTestMaterializerEnv(t, ms, []string{"0"}, []string{"0"}) defer env.close() @@ -1434,8 +1442,16 @@ func TestMaterializerOneToOne(t *testing.T) { env.tmc.expectVRQuery( 200, insertPrefix+ - `\('workflow', 'keyspace:\\"sourceks\\" shard:\\"0\\" filter: rules: > ', '', [0-9]*, [0-9]*, '', '', [0-9]*, 0, 'Stopped', 'vt_targetks'\)`+ - eol, + `\(`+ + `'workflow', `+ + (`'keyspace:\\"sourceks\\" shard:\\"0\\" `+ + `filter:<`+ + `rules: `+ + `rules: `+ + `rules: `+ + `> ', `)+ + `'', [0-9]*, [0-9]*, '', '', [0-9]*, 0, 'Stopped', 'vt_targetks'`+ + `\)`+eol, &sqltypes.Result{}, ) env.tmc.expectVRQuery(200, mzUpdateQuery, &sqltypes.Result{})