Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 22 additions & 15 deletions go/vt/wrangler/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,30 +71,37 @@ const (
createDDLAsCopyDropForeignKeys = "copy:drop_foreign_keys"
)

// addTablesToVSchema adds tables to an (unsharded) vschema. Depending on copyAttributes It will also add any sequence info
// that is associated with a table by copying it from the vschema of the source keyspace.
// For a migrate workflow we do not copy attributes since the source keyspace is just a proxy to import data into Vitess
// Todo: For now we only copy sequence but later we may also want to copy other attributes like authoritative column flag and list of columns
func (wr *Wrangler) addTablesToVSchema(ctx context.Context, sourceKeyspace string, targetVSchema *vschemapb.Keyspace, tables []string, copyAttributes bool) error {
// addTablesToVSchema adds tables to an (unsharded) vschema if they are not already defined.
// If copyVSchema is true then we copy over the vschema table definitions from the source,
// otherwise we create empty ones.
// For a migrate workflow we do not copy the vschema since the source keyspace is just a
// proxy to import data into Vitess.
func (wr *Wrangler) addTablesToVSchema(ctx context.Context, sourceKeyspace string, targetVSchema *vschemapb.Keyspace, tables []string, copyVSchema bool) error {
if targetVSchema.Tables == nil {
targetVSchema.Tables = make(map[string]*vschemapb.Table)
}
for _, table := range tables {
targetVSchema.Tables[table] = &vschemapb.Table{}
}

if copyAttributes { // if source keyspace is provided, copy over the sequence info.
if copyVSchema {
srcVSchema, err := wr.ts.GetVSchema(ctx, sourceKeyspace)
if err != nil {
return err
return vterrors.Wrapf(err, "failed to get vschema for source keyspace %s", sourceKeyspace)
}
for _, table := range tables {
srcTable, ok := srcVSchema.Tables[table]
if ok {
targetVSchema.Tables[table].AutoIncrement = srcTable.AutoIncrement
srcTable, sok := srcVSchema.Tables[table]
if _, tok := targetVSchema.Tables[table]; sok && !tok {
targetVSchema.Tables[table] = srcTable
// If going from sharded to unsharded, then we need to remove the
// column vindexes as they are not valid for unsharded tables.
if srcVSchema.Sharded {
targetVSchema.Tables[table].ColumnVindexes = nil
}
}
}

}
// Ensure that each table at least has an empty definition on the target.
for _, table := range tables {
if _, tok := targetVSchema.Tables[table]; !tok {
targetVSchema.Tables[table] = &vschemapb.Table{}
}
}
return nil
}
Expand Down
193 changes: 193 additions & 0 deletions go/vt/wrangler/materializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
"vitess.io/vitess/go/vt/topo/memorytopo"
"vitess.io/vitess/go/vt/vtgate/vindexes"
)

const mzUpdateQuery = "update _vt.vreplication set state='Running' where db_name='vt_targetks' and workflow='workflow'"
Expand Down Expand Up @@ -2833,3 +2834,195 @@ func TestMoveTablesDDLFlag(t *testing.T) {
})
}
}

func TestAddTablesToVSchema(t *testing.T) {
ctx := context.Background()
ts := memorytopo.NewServer("zone1")
srcks := "source"
wr := &Wrangler{
logger: logutil.NewMemoryLogger(),
ts: ts,
sourceTs: ts,
}
tests := []struct {
name string
sourceVSchema *vschemapb.Keyspace
inTargetVSchema *vschemapb.Keyspace
tables []string
copyVSchema bool
wantTargetVSchema *vschemapb.Keyspace
}{
{
name: "no target vschema; copy source vschema",
sourceVSchema: &vschemapb.Keyspace{
Tables: map[string]*vschemapb.Table{
"t1": {
Type: vindexes.TypeReference,
},
"t2": {
Type: vindexes.TypeSequence,
},
"t3": {
AutoIncrement: &vschemapb.AutoIncrement{
Column: "c1",
Sequence: "t2",
},
},
},
},
inTargetVSchema: &vschemapb.Keyspace{},
tables: []string{"t1", "t2", "t3", "t4"},
copyVSchema: true,
wantTargetVSchema: &vschemapb.Keyspace{
Tables: map[string]*vschemapb.Table{
"t1": {
Type: vindexes.TypeReference,
},
"t2": {
Type: vindexes.TypeSequence,
},
"t3": {
AutoIncrement: &vschemapb.AutoIncrement{
Column: "c1",
Sequence: "t2",
},
},
"t4": {},
},
},
},
{
name: "no target vschema; copy source vschema; sharded source",
sourceVSchema: &vschemapb.Keyspace{
Sharded: true,
Tables: map[string]*vschemapb.Table{
"t1": {
Type: vindexes.TypeReference,
},
"t2": {
Type: vindexes.TypeSequence,
},
"t3": {
AutoIncrement: &vschemapb.AutoIncrement{
Column: "c1",
Sequence: "t2",
},
},
"t4": {
ColumnVindexes: []*vschemapb.ColumnVindex{ // Should be stripped on target
{
Column: "c1",
Name: "hash",
},
},
},
},
},
inTargetVSchema: &vschemapb.Keyspace{},
tables: []string{"t1", "t2", "t3", "t4"},
copyVSchema: true,
wantTargetVSchema: &vschemapb.Keyspace{
Tables: map[string]*vschemapb.Table{
"t1": {
Type: vindexes.TypeReference,
},
"t2": {
Type: vindexes.TypeSequence,
},
"t3": {
AutoIncrement: &vschemapb.AutoIncrement{
Column: "c1",
Sequence: "t2",
},
},
"t4": {},
},
},
},
{
name: "target vschema; copy source vschema",
sourceVSchema: &vschemapb.Keyspace{
Tables: map[string]*vschemapb.Table{
"t1": {
Type: vindexes.TypeReference,
},
"t2": {
Type: vindexes.TypeSequence,
},
"t3": {
AutoIncrement: &vschemapb.AutoIncrement{
Column: "c1",
Sequence: "t2",
},
},
"t4": {
ColumnVindexes: []*vschemapb.ColumnVindex{ // Should be stripped on target
{
Column: "c1",
Name: "hash",
},
},
},
},
},
inTargetVSchema: &vschemapb.Keyspace{
Tables: map[string]*vschemapb.Table{
"t1": {
Type: vindexes.TypeReference,
},
"t2": {},
"t3": {},
"t4": {},
},
},
tables: []string{"t1", "t2", "t3", "t4"},
copyVSchema: true,
wantTargetVSchema: &vschemapb.Keyspace{
Tables: map[string]*vschemapb.Table{
"t1": {
Type: vindexes.TypeReference,
},
"t2": {},
"t3": {},
"t4": {},
},
},
},
{
name: "no target vschema; do not copy source vschema",
sourceVSchema: &vschemapb.Keyspace{
Tables: map[string]*vschemapb.Table{
"t1": {
Type: vindexes.TypeReference,
},
"t2": {
Type: vindexes.TypeSequence,
},
"t3": {
AutoIncrement: &vschemapb.AutoIncrement{
Column: "c1",
Sequence: "t2",
},
},
},
},
inTargetVSchema: &vschemapb.Keyspace{},
tables: []string{"t1", "t2"},
copyVSchema: false,
wantTargetVSchema: &vschemapb.Keyspace{
Tables: map[string]*vschemapb.Table{
"t1": {},
"t2": {},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ts.SaveVSchema(ctx, srcks, tt.sourceVSchema)
err := wr.addTablesToVSchema(ctx, srcks, tt.inTargetVSchema, tt.tables, tt.copyVSchema)
require.NoError(t, err)
require.Equal(t, tt.wantTargetVSchema, tt.inTargetVSchema)
})
}
}