Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions go/vt/vtctl/vtctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,9 @@ var commands = []commandGroup{
{"CreateLookupVindex", commandCreateLookupVindex,
"[-cell=<cell>] [-tablet_types=<source_tablet_types>] <keyspace> <json_spec>",
`Create and backfill a lookup vindex. the json_spec must contain the vindex and colvindex specs for the new lookup.`},
{"ExternalizeVindex", commandExternalizeVindex,
"<keyspace>.<vindex>",
`Externalize a backfilled vindex.`},
{"Materialize", commandMaterialize,
`<json_spec>, example : '{"workflow": "aaa", "source_keyspace": "source", "target_keyspace": "target", "table_settings": [{"target_table": "customer", "source_expression": "select * from customer", "create_ddl": "copy"}]}'`,
"Performs materialization based on the json spec."},
Expand Down Expand Up @@ -1850,6 +1853,16 @@ func commandCreateLookupVindex(ctx context.Context, wr *wrangler.Wrangler, subFl
return wr.CreateLookupVindex(ctx, keyspace, specs, *cell, *tabletTypes)
}

func commandExternalizeVindex(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
if err := subFlags.Parse(args); err != nil {
return err
}
if subFlags.NArg() != 1 {
return fmt.Errorf("one argument is required: keyspace.vindex")
}
return wr.ExternalizeVindex(ctx, subFlags.Arg(0))
}

func commandMaterialize(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
if err := subFlags.Parse(args); err != nil {
return err
Expand Down
105 changes: 105 additions & 0 deletions go/vt/wrangler/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"golang.org/x/net/context"

"vitess.io/vitess/go/json2"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/key"
Expand Down Expand Up @@ -419,6 +420,110 @@ func generateColDef(lines []string, sourceVindexCol, vindexFromCol string) (stri
return "", fmt.Errorf("column %s not found in schema %v", sourceVindexCol, lines)
}

// ExternalizeVindex externalizes a lookup vindex that's finished backfilling or has caught up.
func (wr *Wrangler) ExternalizeVindex(ctx context.Context, qualifiedVindexName string) error {
splits := strings.Split(qualifiedVindexName, ".")
if len(splits) != 2 {
return fmt.Errorf("vindex name should be of the form keyspace.vindex: %s", qualifiedVindexName)
}
sourceKeyspace, vindexName := splits[0], splits[1]
sourceVSchema, err := wr.ts.GetVSchema(ctx, sourceKeyspace)
if err != nil {
return err
}
sourceVindex := sourceVSchema.Vindexes[vindexName]
if sourceVindex == nil {
return fmt.Errorf("vindex %s not found in vschema", qualifiedVindexName)
}
qualifiedTableName := sourceVindex.Params["table"]
splits = strings.Split(qualifiedTableName, ".")
if len(splits) != 2 {
return fmt.Errorf("table name in vindex should be of the form keyspace.table: %s", qualifiedTableName)
}
targetKeyspace, targetTableName := splits[0], splits[1]
workflow := targetTableName + "_vdx"
targetShards, err := wr.ts.GetServingShards(ctx, targetKeyspace)
if err != nil {
return err
}

// Create a parallelizer function.
forAllTargets := func(f func(*topo.ShardInfo) error) error {
var wg sync.WaitGroup
allErrors := &concurrency.AllErrorRecorder{}
for _, targetShard := range targetShards {
wg.Add(1)
go func(targetShard *topo.ShardInfo) {
defer wg.Done()

if err := f(targetShard); err != nil {
allErrors.RecordError(err)
}
}(targetShard)
}
wg.Wait()
return allErrors.AggrError(vterrors.Aggregate)
}

err = forAllTargets(func(targetShard *topo.ShardInfo) error {
targetMaster, err := wr.ts.GetTablet(ctx, targetShard.MasterAlias)
if err != nil {
return err
}
p3qr, err := wr.tmc.VReplicationExec(ctx, targetMaster.Tablet, fmt.Sprintf("select id, state, message from _vt.vreplication where workflow=%s and db_name=%s", encodeString(workflow), encodeString(targetMaster.DbName())))
if err != nil {
return err
}
qr := sqltypes.Proto3ToResult(p3qr)
for _, row := range qr.Rows {
id, err := sqltypes.ToInt64(row[0])
if err != nil {
return err
}
state := row[1].ToString()
message := row[2].ToString()
if sourceVindex.Owner == "" {
// If there's no owner, all streams need to be running.
if state != binlogplayer.BlpRunning {
return fmt.Errorf("stream %d for %v.%v is not in Running state: %v", id, targetShard.Keyspace(), targetShard.ShardName(), state)
}
} else {
// If there is an owner, all streams need to be stopped after copy.
if state != binlogplayer.BlpStopped || !strings.Contains(message, "Stopped after copy") {
return fmt.Errorf("stream %d for %v.%v is not in Stopped after copy state: %v, %v", id, targetShard.Keyspace(), targetShard.ShardName(), state, message)
}
}
}
return nil
})
if err != nil {
return err
}

if sourceVindex.Owner != "" {
// If there is an owner, we have to delete the streams.
err := forAllTargets(func(targetShard *topo.ShardInfo) error {
targetMaster, err := wr.ts.GetTablet(ctx, targetShard.MasterAlias)
if err != nil {
return err
}
query := fmt.Sprintf("delete from _vt.vreplication where db_name=%s and workflow=%s", encodeString(targetMaster.DbName()), encodeString(workflow))
_, err = wr.tmc.VReplicationExec(ctx, targetMaster.Tablet, query)
if err != nil {
return err
}
return nil
})
if err != nil {
return err
}
}

// Remove the write_only param and save the source vschema.
delete(sourceVindex.Params, "write_only")
return wr.ts.SaveVSchema(ctx, sourceKeyspace, sourceVSchema)
}

// Materialize performs the steps needed to materialize a list of tables based on the materialization specs.
func (wr *Wrangler) Materialize(ctx context.Context, ms *vtctldatapb.MaterializeSettings) error {
if err := wr.validateNewWorkflow(ctx, ms.TargetKeyspace, ms.Workflow); err != nil {
Expand Down
123 changes: 123 additions & 0 deletions go/vt/wrangler/materializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1290,6 +1290,129 @@ func TestCreateLookupVindexFailures(t *testing.T) {
}
}

func TestExternalizeVindex(t *testing.T) {
ms := &vtctldatapb.MaterializeSettings{
SourceKeyspace: "sourceks",
TargetKeyspace: "targetks",
}
env := newTestMaterializerEnv(t, ms, []string{"0"}, []string{"-80", "80-"})
defer env.close()

sourceVSchema := &vschemapb.Keyspace{
Sharded: true,
Vindexes: map[string]*vschemapb.Vindex{
"hash": {
Type: "hash",
},
"owned": {
Type: "lookup_unique",
Params: map[string]string{
"table": "targetks.lkp",
"from": "c1",
"to": "c2",
"write_only": "true",
},
Owner: "t1",
},
"unowned": {
Type: "lookup_unique",
Params: map[string]string{
"table": "targetks.lkp",
"from": "c1",
"to": "c2",
"write_only": "true",
},
},
"bad": {
Type: "lookup_unique",
Params: map[string]string{
"table": "unqualified",
"from": "c1",
"to": "c2",
},
},
},
Tables: map[string]*vschemapb.Table{
"t1": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Name: "hash",
Column: "col1",
}, {
Name: "owned",
Column: "col2",
}},
},
},
}
fields := sqltypes.MakeTestFields(
"id|state|message",
"int64|varbinary|varbinary",
)
running := sqltypes.MakeTestResult(fields, "1|Running|msg")
stopped := sqltypes.MakeTestResult(fields, "1|Stopped|Stopped after copy")
testcases := []struct {
input string
vrResponse *sqltypes.Result
expectDelete bool
err string
}{{
input: "sourceks.owned",
vrResponse: stopped,
expectDelete: true,
}, {
input: "sourceks.unowned",
vrResponse: running,
}, {
input: "unqualified",
err: "vindex name should be of the form keyspace.vindex: unqualified",
}, {
input: "sourceks.absent",
err: "vindex sourceks.absent not found in vschema",
}, {
input: "sourceks.bad",
err: "table name in vindex should be of the form keyspace.table: unqualified",
}, {
input: "sourceks.owned",
vrResponse: running,
err: "is not in Stopped after copy state",
}, {
input: "sourceks.unowned",
vrResponse: stopped,
err: "is not in Running state",
}}
for _, tcase := range testcases {
// Resave the source schema for every iteration.
if err := env.topoServ.SaveVSchema(context.Background(), ms.SourceKeyspace, sourceVSchema); err != nil {
t.Fatal(err)
}
if tcase.vrResponse != nil {
validationQuery := "select id, state, message from _vt.vreplication where workflow='lkp_vdx' and db_name='vt_targetks'"
env.tmc.expectVRQuery(200, validationQuery, tcase.vrResponse)
env.tmc.expectVRQuery(210, validationQuery, tcase.vrResponse)
}

if tcase.expectDelete {
deleteQuery := "delete from _vt.vreplication where db_name='vt_targetks' and workflow='lkp_vdx'"
env.tmc.expectVRQuery(200, deleteQuery, &sqltypes.Result{})
env.tmc.expectVRQuery(210, deleteQuery, &sqltypes.Result{})
}

err := env.wr.ExternalizeVindex(context.Background(), tcase.input)
if tcase.err != "" {
if err == nil || !strings.Contains(err.Error(), tcase.err) {
t.Errorf("ExternalizeVindex(%s) err: %v, must contain %v", tcase.input, err, tcase.err)
}
continue
}
require.NoError(t, err)

outvschema, err := env.topoServ.GetVSchema(context.Background(), ms.SourceKeyspace)
require.NoError(t, err)
vindexName := strings.Split(tcase.input, ".")[1]
assert.NotContains(t, outvschema.Vindexes[vindexName].Params, "write_only", tcase.input)
}
}

func TestMaterializerOneToOne(t *testing.T) {
ms := &vtctldatapb.MaterializeSettings{
Workflow: "workflow",
Expand Down