diff --git a/go/vt/vtctl/vtctl.go b/go/vt/vtctl/vtctl.go index 6f12fd748bc..30b608f5f45 100644 --- a/go/vt/vtctl/vtctl.go +++ b/go/vt/vtctl/vtctl.go @@ -317,6 +317,9 @@ var commands = []commandGroup{ {"CreateLookupVindex", commandCreateLookupVindex, "[-cell=] [-tablet_types=] ", `Create and backfill a lookup vindex. the json_spec must contain the vindex and colvindex specs for the new lookup.`}, + {"ExternalizeVindex", commandExternalizeVindex, + ".", + `Externalize a backfilled vindex.`}, {"Materialize", commandMaterialize, `, example : '{"workflow": "aaa", "source_keyspace": "source", "target_keyspace": "target", "table_settings": [{"target_table": "customer", "source_expression": "select * from customer", "create_ddl": "copy"}]}'`, "Performs materialization based on the json spec."}, @@ -1850,6 +1853,16 @@ func commandCreateLookupVindex(ctx context.Context, wr *wrangler.Wrangler, subFl return wr.CreateLookupVindex(ctx, keyspace, specs, *cell, *tabletTypes) } +func commandExternalizeVindex(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error { + if err := subFlags.Parse(args); err != nil { + return err + } + if subFlags.NArg() != 1 { + return fmt.Errorf("one argument is required: keyspace.vindex") + } + return wr.ExternalizeVindex(ctx, subFlags.Arg(0)) +} + func commandMaterialize(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error { if err := subFlags.Parse(args); err != nil { return err diff --git a/go/vt/wrangler/materializer.go b/go/vt/wrangler/materializer.go index ea7a235f34b..3d455e0e283 100644 --- a/go/vt/wrangler/materializer.go +++ b/go/vt/wrangler/materializer.go @@ -26,6 +26,7 @@ import ( "golang.org/x/net/context" "vitess.io/vitess/go/json2" + "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/key" @@ -419,6 +420,110 @@ func generateColDef(lines []string, sourceVindexCol, vindexFromCol string) (stri return "", fmt.Errorf("column %s not found in schema %v", sourceVindexCol, lines) } +// ExternalizeVindex externalizes a lookup vindex that's finished backfilling or has caught up. +func (wr *Wrangler) ExternalizeVindex(ctx context.Context, qualifiedVindexName string) error { + splits := strings.Split(qualifiedVindexName, ".") + if len(splits) != 2 { + return fmt.Errorf("vindex name should be of the form keyspace.vindex: %s", qualifiedVindexName) + } + sourceKeyspace, vindexName := splits[0], splits[1] + sourceVSchema, err := wr.ts.GetVSchema(ctx, sourceKeyspace) + if err != nil { + return err + } + sourceVindex := sourceVSchema.Vindexes[vindexName] + if sourceVindex == nil { + return fmt.Errorf("vindex %s not found in vschema", qualifiedVindexName) + } + qualifiedTableName := sourceVindex.Params["table"] + splits = strings.Split(qualifiedTableName, ".") + if len(splits) != 2 { + return fmt.Errorf("table name in vindex should be of the form keyspace.table: %s", qualifiedTableName) + } + targetKeyspace, targetTableName := splits[0], splits[1] + workflow := targetTableName + "_vdx" + targetShards, err := wr.ts.GetServingShards(ctx, targetKeyspace) + if err != nil { + return err + } + + // Create a parallelizer function. + forAllTargets := func(f func(*topo.ShardInfo) error) error { + var wg sync.WaitGroup + allErrors := &concurrency.AllErrorRecorder{} + for _, targetShard := range targetShards { + wg.Add(1) + go func(targetShard *topo.ShardInfo) { + defer wg.Done() + + if err := f(targetShard); err != nil { + allErrors.RecordError(err) + } + }(targetShard) + } + wg.Wait() + return allErrors.AggrError(vterrors.Aggregate) + } + + err = forAllTargets(func(targetShard *topo.ShardInfo) error { + targetMaster, err := wr.ts.GetTablet(ctx, targetShard.MasterAlias) + if err != nil { + return err + } + p3qr, err := wr.tmc.VReplicationExec(ctx, targetMaster.Tablet, fmt.Sprintf("select id, state, message from _vt.vreplication where workflow=%s and db_name=%s", encodeString(workflow), encodeString(targetMaster.DbName()))) + if err != nil { + return err + } + qr := sqltypes.Proto3ToResult(p3qr) + for _, row := range qr.Rows { + id, err := sqltypes.ToInt64(row[0]) + if err != nil { + return err + } + state := row[1].ToString() + message := row[2].ToString() + if sourceVindex.Owner == "" { + // If there's no owner, all streams need to be running. + if state != binlogplayer.BlpRunning { + return fmt.Errorf("stream %d for %v.%v is not in Running state: %v", id, targetShard.Keyspace(), targetShard.ShardName(), state) + } + } else { + // If there is an owner, all streams need to be stopped after copy. + if state != binlogplayer.BlpStopped || !strings.Contains(message, "Stopped after copy") { + return fmt.Errorf("stream %d for %v.%v is not in Stopped after copy state: %v, %v", id, targetShard.Keyspace(), targetShard.ShardName(), state, message) + } + } + } + return nil + }) + if err != nil { + return err + } + + if sourceVindex.Owner != "" { + // If there is an owner, we have to delete the streams. + err := forAllTargets(func(targetShard *topo.ShardInfo) error { + targetMaster, err := wr.ts.GetTablet(ctx, targetShard.MasterAlias) + if err != nil { + return err + } + query := fmt.Sprintf("delete from _vt.vreplication where db_name=%s and workflow=%s", encodeString(targetMaster.DbName()), encodeString(workflow)) + _, err = wr.tmc.VReplicationExec(ctx, targetMaster.Tablet, query) + if err != nil { + return err + } + return nil + }) + if err != nil { + return err + } + } + + // Remove the write_only param and save the source vschema. + delete(sourceVindex.Params, "write_only") + return wr.ts.SaveVSchema(ctx, sourceKeyspace, sourceVSchema) +} + // Materialize performs the steps needed to materialize a list of tables based on the materialization specs. func (wr *Wrangler) Materialize(ctx context.Context, ms *vtctldatapb.MaterializeSettings) error { if err := wr.validateNewWorkflow(ctx, ms.TargetKeyspace, ms.Workflow); err != nil { diff --git a/go/vt/wrangler/materializer_test.go b/go/vt/wrangler/materializer_test.go index 886f6ecef4a..f3587020b29 100644 --- a/go/vt/wrangler/materializer_test.go +++ b/go/vt/wrangler/materializer_test.go @@ -1290,6 +1290,129 @@ func TestCreateLookupVindexFailures(t *testing.T) { } } +func TestExternalizeVindex(t *testing.T) { + ms := &vtctldatapb.MaterializeSettings{ + SourceKeyspace: "sourceks", + TargetKeyspace: "targetks", + } + env := newTestMaterializerEnv(t, ms, []string{"0"}, []string{"-80", "80-"}) + defer env.close() + + sourceVSchema := &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "hash": { + Type: "hash", + }, + "owned": { + Type: "lookup_unique", + Params: map[string]string{ + "table": "targetks.lkp", + "from": "c1", + "to": "c2", + "write_only": "true", + }, + Owner: "t1", + }, + "unowned": { + Type: "lookup_unique", + Params: map[string]string{ + "table": "targetks.lkp", + "from": "c1", + "to": "c2", + "write_only": "true", + }, + }, + "bad": { + Type: "lookup_unique", + Params: map[string]string{ + "table": "unqualified", + "from": "c1", + "to": "c2", + }, + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Name: "hash", + Column: "col1", + }, { + Name: "owned", + Column: "col2", + }}, + }, + }, + } + fields := sqltypes.MakeTestFields( + "id|state|message", + "int64|varbinary|varbinary", + ) + running := sqltypes.MakeTestResult(fields, "1|Running|msg") + stopped := sqltypes.MakeTestResult(fields, "1|Stopped|Stopped after copy") + testcases := []struct { + input string + vrResponse *sqltypes.Result + expectDelete bool + err string + }{{ + input: "sourceks.owned", + vrResponse: stopped, + expectDelete: true, + }, { + input: "sourceks.unowned", + vrResponse: running, + }, { + input: "unqualified", + err: "vindex name should be of the form keyspace.vindex: unqualified", + }, { + input: "sourceks.absent", + err: "vindex sourceks.absent not found in vschema", + }, { + input: "sourceks.bad", + err: "table name in vindex should be of the form keyspace.table: unqualified", + }, { + input: "sourceks.owned", + vrResponse: running, + err: "is not in Stopped after copy state", + }, { + input: "sourceks.unowned", + vrResponse: stopped, + err: "is not in Running state", + }} + for _, tcase := range testcases { + // Resave the source schema for every iteration. + if err := env.topoServ.SaveVSchema(context.Background(), ms.SourceKeyspace, sourceVSchema); err != nil { + t.Fatal(err) + } + if tcase.vrResponse != nil { + validationQuery := "select id, state, message from _vt.vreplication where workflow='lkp_vdx' and db_name='vt_targetks'" + env.tmc.expectVRQuery(200, validationQuery, tcase.vrResponse) + env.tmc.expectVRQuery(210, validationQuery, tcase.vrResponse) + } + + if tcase.expectDelete { + deleteQuery := "delete from _vt.vreplication where db_name='vt_targetks' and workflow='lkp_vdx'" + env.tmc.expectVRQuery(200, deleteQuery, &sqltypes.Result{}) + env.tmc.expectVRQuery(210, deleteQuery, &sqltypes.Result{}) + } + + err := env.wr.ExternalizeVindex(context.Background(), tcase.input) + if tcase.err != "" { + if err == nil || !strings.Contains(err.Error(), tcase.err) { + t.Errorf("ExternalizeVindex(%s) err: %v, must contain %v", tcase.input, err, tcase.err) + } + continue + } + require.NoError(t, err) + + outvschema, err := env.topoServ.GetVSchema(context.Background(), ms.SourceKeyspace) + require.NoError(t, err) + vindexName := strings.Split(tcase.input, ".")[1] + assert.NotContains(t, outvschema.Vindexes[vindexName].Params, "write_only", tcase.input) + } +} + func TestMaterializerOneToOne(t *testing.T) { ms := &vtctldatapb.MaterializeSettings{ Workflow: "workflow",