Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 73 additions & 27 deletions go/vt/wrangler/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"sync"
"text/template"

"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/vtgate/evalengine"

"github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -50,6 +51,10 @@ type materializer struct {
targetShards []*topo.ShardInfo
}

const (
createDDLAsCopy = "copy"
)

// MoveTables initiates moving table(s) over to another keyspace
func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, targetKeyspace, tableSpecs, cell, tabletTypes string) error {
var tables []string
Expand Down Expand Up @@ -112,7 +117,7 @@ func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, ta
ms.TableSettings = append(ms.TableSettings, &vtctldatapb.TableMaterializeSettings{
TargetTable: table,
SourceExpression: buf.String(),
CreateDdl: "copy",
CreateDdl: createDDLAsCopy,
})
}
return wr.Materialize(ctx, ms)
Expand Down Expand Up @@ -583,46 +588,77 @@ func (wr *Wrangler) buildMaterializer(ctx context.Context, ms *vtctldatapb.Mater
}

func (mz *materializer) deploySchema(ctx context.Context) error {

return mz.forAllTargets(func(target *topo.ShardInfo) error {
for _, ts := range mz.ms.TableSettings {
tableSchema, err := mz.wr.GetSchema(ctx, target.MasterAlias, []string{ts.TargetTable}, nil, false)
allTables := []string{"/.*/"}

hasTargetTable := map[string]bool{}
{
targetSchema, err := mz.wr.GetSchema(ctx, target.MasterAlias, allTables, nil, false)
if err != nil {
return err
}
if len(tableSchema.TableDefinitions) != 0 {

for _, td := range targetSchema.TableDefinitions {
hasTargetTable[td.Name] = true
}
}

sourceDDL := map[string]string{}
{
sourceMaster := mz.sourceShards[0].MasterAlias
if sourceMaster == nil {
return fmt.Errorf("source shard must have a master for copying schema: %v", mz.sourceShards[0].ShardName())
}

log.Infof("getting table schemas from source master...")
var err error
sourceSchema, err := mz.wr.GetSchema(ctx, sourceMaster, allTables, nil, false)
if err != nil {
return err
}

for _, td := range sourceSchema.TableDefinitions {
sourceDDL[td.Name] = td.Schema
}
}

for _, ts := range mz.ms.TableSettings {
if hasTargetTable[ts.TargetTable] {
// Table already exists.
continue
}
if ts.CreateDdl == "" {
return fmt.Errorf("target table %v does not exist and there is no create ddl defined", ts.TargetTable)
}
createddl := ts.CreateDdl
if createddl == "copy" {
sourceTableName, err := sqlparser.TableFromStatement(ts.SourceExpression)
if err != nil {
return err
}
if sourceTableName.Name.String() != ts.TargetTable {
return fmt.Errorf("source and target table names must match for copying schema: %v vs %v", sqlparser.String(sourceTableName), ts.TargetTable)
}
sourceMaster := mz.sourceShards[0].MasterAlias
if sourceMaster == nil {
return fmt.Errorf("source shard must have a master for copying schema: %v", mz.sourceShards[0].ShardName())
}
sourceSchema, err := mz.wr.GetSchema(ctx, sourceMaster, []string{ts.TargetTable}, nil, false)
if err != nil {
return err
createDDL := ts.CreateDdl
if createDDL == createDDLAsCopy {
if ts.SourceExpression != "" {
// Check for table if non-empty SourceExpression.
sourceTableName, err := sqlparser.TableFromStatement(ts.SourceExpression)
if err != nil {
return err
}
if sourceTableName.Name.String() != ts.TargetTable {
return fmt.Errorf("source and target table names must match for copying schema: %v vs %v", sqlparser.String(sourceTableName), ts.TargetTable)

}

}
if len(sourceSchema.TableDefinitions) == 0 {

ddl, ok := sourceDDL[ts.TargetTable]
if !ok {
return fmt.Errorf("source table %v does not exist", ts.TargetTable)
}
createddl = sourceSchema.TableDefinitions[0].Schema
createDDL = ddl
}

targetTablet, err := mz.wr.ts.GetTablet(ctx, target.MasterAlias)
if err != nil {
return err
}
if _, err := mz.wr.tmc.ExecuteFetchAsDba(ctx, targetTablet.Tablet, false, []byte(createddl), 0, false, true); err != nil {

if _, err := mz.wr.tmc.ExecuteFetchAsDba(ctx, targetTablet.Tablet, false, []byte(createDDL), 0, false, true); err != nil {
return err
}
}
Expand All @@ -644,7 +680,13 @@ func (mz *materializer) generateInserts(ctx context.Context) (string, error) {
rule := &binlogdatapb.Rule{
Match: ts.TargetTable,
}
// Validate the query.

if ts.SourceExpression == "" {
bls.Filter.Rules = append(bls.Filter.Rules, rule)
continue
}

// Validate non-empty query.
stmt, err := sqlparser.Parse(ts.SourceExpression)
if err != nil {
return "", err
Expand All @@ -653,6 +695,8 @@ func (mz *materializer) generateInserts(ctx context.Context) (string, error) {
if !ok {
return "", fmt.Errorf("unrecognized statement: %s", ts.SourceExpression)
}

filter := ts.SourceExpression
if mz.targetVSchema.Keyspace.Sharded && mz.targetVSchema.Tables[ts.TargetTable].Type != vindexes.TypeReference {
cv, err := vindexes.FindBestColVindex(mz.targetVSchema.Tables[ts.TargetTable])
if err != nil {
Expand Down Expand Up @@ -680,10 +724,12 @@ func (mz *materializer) generateInserts(ctx context.Context) (string, error) {
Exprs: subExprs,
},
}
rule.Filter = sqlparser.String(sel)
} else {
rule.Filter = ts.SourceExpression

filter = sqlparser.String(sel)
}

rule.Filter = filter
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice.


bls.Filter.Rules = append(bls.Filter.Rules, rule)
}
ig.AddRow(mz.ms.Workflow, bls, "", mz.ms.Cell, mz.ms.TabletTypes)
Expand Down
35 changes: 27 additions & 8 deletions go/vt/wrangler/materializer_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package wrangler
import (
"fmt"
"regexp"
"strings"
"sync"
"testing"

Expand Down Expand Up @@ -76,18 +77,20 @@ func newTestMaterializerEnv(t *testing.T, ms *vtctldatapb.MaterializeSettings, s
}

for _, ts := range ms.TableSettings {
tablename := ts.TargetTable
tableName := ts.TargetTable
table, err := sqlparser.TableFromStatement(ts.SourceExpression)
if err == nil {
tablename = table.Name.String()
tableName = table.Name.String()
}
env.tmc.schema[ms.SourceKeyspace+"."+tablename] = &tabletmanagerdatapb.SchemaDefinition{
env.tmc.schema[ms.SourceKeyspace+"."+tableName] = &tabletmanagerdatapb.SchemaDefinition{
TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{
Schema: fmt.Sprintf("%s_schema", tablename),
Name: tableName,
Schema: fmt.Sprintf("%s_schema", tableName),
}},
}
env.tmc.schema[ms.TargetKeyspace+"."+ts.TargetTable] = &tabletmanagerdatapb.SchemaDefinition{
TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{
Name: ts.TargetTable,
Schema: fmt.Sprintf("%s_schema", ts.TargetTable),
}},
}
Expand Down Expand Up @@ -169,11 +172,27 @@ func newTestMaterializerTMClient() *testMaterializerTMClient {
}

func (tmc *testMaterializerTMClient) GetSchema(ctx context.Context, tablet *topodatapb.Tablet, tables, excludeTables []string, includeViews bool) (*tabletmanagerdatapb.SchemaDefinition, error) {
key := tablet.Keyspace + "." + tables[0]
if tmc.schema[key] == nil {
return &tabletmanagerdatapb.SchemaDefinition{}, nil
schemaDefn := &tabletmanagerdatapb.SchemaDefinition{}
for _, table := range tables {
// TODO: Add generalized regexps if needed for test purposes.
if table == "/.*/" {
// Special case of all tables in keyspace.
for key, tableDefn := range tmc.schema {
if strings.HasPrefix(key, tablet.Keyspace+".") {
schemaDefn.TableDefinitions = append(schemaDefn.TableDefinitions, tableDefn.TableDefinitions...)
}
}
break
}

key := tablet.Keyspace + "." + table
tableDefn := tmc.schema[key]
if tableDefn == nil {
continue
}
schemaDefn.TableDefinitions = append(schemaDefn.TableDefinitions, tableDefn.TableDefinitions...)
}
return tmc.schema[key], nil
return schemaDefn, nil
}

func (tmc *testMaterializerTMClient) expectVRQuery(tabletID int, query string, result *sqltypes.Result) {
Expand Down
60 changes: 48 additions & 12 deletions go/vt/wrangler/materializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1418,24 +1418,40 @@ func TestMaterializerOneToOne(t *testing.T) {
Workflow: "workflow",
SourceKeyspace: "sourceks",
TargetKeyspace: "targetks",
TableSettings: []*vtctldatapb.TableMaterializeSettings{{
TargetTable: "t1",
SourceExpression: "select * from t1",
CreateDdl: "t1ddl",
}, {
TargetTable: "t2",
SourceExpression: "select * from t3",
CreateDdl: "t2ddl",
}},
TableSettings: []*vtctldatapb.TableMaterializeSettings{
{
TargetTable: "t1",
SourceExpression: "select * from t1",
CreateDdl: "t1ddl",
},
{
TargetTable: "t2",
SourceExpression: "select * from t3",
CreateDdl: "t2ddl",
},
{
TargetTable: "t4",
SourceExpression: "", // empty
CreateDdl: "t4ddl",
},
},
}
env := newTestMaterializerEnv(t, ms, []string{"0"}, []string{"0"})
defer env.close()

env.tmc.expectVRQuery(
200,
insertPrefix+
`\('workflow', 'keyspace:\\"sourceks\\" shard:\\"0\\" filter:<rules:<match:\\"t1\\" filter:\\"select.*t1\\" > rules:<match:\\"t2\\" filter:\\"select.*t3\\" > > ', '', [0-9]*, [0-9]*, '', '', [0-9]*, 0, 'Stopped', 'vt_targetks'\)`+
eol,
`\(`+
`'workflow', `+
(`'keyspace:\\"sourceks\\" shard:\\"0\\" `+
`filter:<`+
`rules:<match:\\"t1\\" filter:\\"select.*t1\\" > `+
`rules:<match:\\"t2\\" filter:\\"select.*t3\\" > `+
`rules:<match:\\"t4\\" > `+
`> ', `)+
`'', [0-9]*, [0-9]*, '', '', [0-9]*, 0, 'Stopped', 'vt_targetks'`+
`\)`+eol,
&sqltypes.Result{},
)
env.tmc.expectVRQuery(200, mzUpdateQuery, &sqltypes.Result{})
Expand Down Expand Up @@ -1953,7 +1969,27 @@ func TestMaterializerNoSourceMaster(t *testing.T) {
assert.EqualError(t, err, "source shard must have a master for copying schema: 0")
}

func TestMaterializerTableMismatch(t *testing.T) {
func TestMaterializerTableMismatchNonCopy(t *testing.T) {
ms := &vtctldatapb.MaterializeSettings{
Workflow: "workflow",
SourceKeyspace: "sourceks",
TargetKeyspace: "targetks",
TableSettings: []*vtctldatapb.TableMaterializeSettings{{
TargetTable: "t1",
SourceExpression: "select * from t2",
CreateDdl: "",
}},
}
env := newTestMaterializerEnv(t, ms, []string{"0"}, []string{"0"})
defer env.close()

delete(env.tmc.schema, "targetks.t1")

err := env.wr.Materialize(context.Background(), ms)
assert.EqualError(t, err, "target table t1 does not exist and there is no create ddl defined")
}

func TestMaterializerTableMismatchCopy(t *testing.T) {
ms := &vtctldatapb.MaterializeSettings{
Workflow: "workflow",
SourceKeyspace: "sourceks",
Expand Down