From 5ed5fa04eeeee11553782e453b76a94c318eac90 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Sat, 18 Jul 2020 15:31:41 +0200 Subject: [PATCH 1/4] Vrepl Crosscell: add cell/tablettypes to reshard command Signed-off-by: Rohit Nayak --- go/vt/vtctl/vtctl.go | 4 +++- go/vt/wrangler/resharder.go | 12 +++++++---- go/vt/wrangler/resharder_test.go | 34 ++++++++++++++++---------------- 3 files changed, 28 insertions(+), 22 deletions(-) diff --git a/go/vt/vtctl/vtctl.go b/go/vt/vtctl/vtctl.go index 4947e0a8177..f0bea2f3000 100644 --- a/go/vt/vtctl/vtctl.go +++ b/go/vt/vtctl/vtctl.go @@ -1851,6 +1851,8 @@ func commandValidateKeyspace(ctx context.Context, wr *wrangler.Wrangler, subFlag } func commandReshard(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error { + cell := subFlags.String("cell", "", "Cell to replicate from.") + tabletTypes := subFlags.String("tablet_types", "", "Source tablet types to replicate from.") skipSchemaCopy := subFlags.Bool("skip_schema_copy", false, "Skip copying of schema to targets") if err := subFlags.Parse(args); err != nil { return err @@ -1864,7 +1866,7 @@ func commandReshard(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.F } source := strings.Split(subFlags.Arg(1), ",") target := strings.Split(subFlags.Arg(2), ",") - return wr.Reshard(ctx, keyspace, workflow, source, target, *skipSchemaCopy) + return wr.Reshard(ctx, keyspace, workflow, source, target, *skipSchemaCopy, *cell, *tabletTypes) } func commandMoveTables(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error { diff --git a/go/vt/wrangler/resharder.go b/go/vt/wrangler/resharder.go index fec719d55ad..bc12ef570ef 100644 --- a/go/vt/wrangler/resharder.go +++ b/go/vt/wrangler/resharder.go @@ -47,6 +47,8 @@ type resharder struct { targetMasters map[string]*topo.TabletInfo vschema *vschemapb.Keyspace refStreams map[string]*refStream + cell string + tabletTypes string } type refStream struct { @@ -57,12 +59,12 @@ type refStream struct { } // Reshard initiates a resharding workflow. -func (wr *Wrangler) Reshard(ctx context.Context, keyspace, workflow string, sources, targets []string, skipSchemaCopy bool) error { +func (wr *Wrangler) Reshard(ctx context.Context, keyspace, workflow string, sources, targets []string, skipSchemaCopy bool, cell, tabletTypes string) error { if err := wr.validateNewWorkflow(ctx, keyspace, workflow); err != nil { return err } - rs, err := wr.buildResharder(ctx, keyspace, workflow, sources, targets) + rs, err := wr.buildResharder(ctx, keyspace, workflow, sources, targets, cell, tabletTypes) if err != nil { return vterrors.Wrap(err, "buildResharder") } @@ -80,13 +82,15 @@ func (wr *Wrangler) Reshard(ctx context.Context, keyspace, workflow string, sour return nil } -func (wr *Wrangler) buildResharder(ctx context.Context, keyspace, workflow string, sources, targets []string) (*resharder, error) { +func (wr *Wrangler) buildResharder(ctx context.Context, keyspace, workflow string, sources, targets []string, cell, tabletTypes string) (*resharder, error) { rs := &resharder{ wr: wr, keyspace: keyspace, workflow: workflow, sourceMasters: make(map[string]*topo.TabletInfo), targetMasters: make(map[string]*topo.TabletInfo), + cell: cell, + tabletTypes: tabletTypes, } for _, shard := range sources { si, err := wr.ts.GetShard(ctx, keyspace, shard) @@ -298,7 +302,7 @@ func (rs *resharder) createStreams(ctx context.Context) error { Shard: source.ShardName(), Filter: filter, } - ig.AddRow(rs.workflow, bls, "", "", "") + ig.AddRow(rs.workflow, bls, "", rs.cell, rs.tabletTypes) } for _, rstream := range rs.refStreams { diff --git a/go/vt/wrangler/resharder_test.go b/go/vt/wrangler/resharder_test.go index d9214c8248d..21eab01c2cd 100644 --- a/go/vt/wrangler/resharder_test.go +++ b/go/vt/wrangler/resharder_test.go @@ -68,7 +68,7 @@ func TestResharderOneToMany(t *testing.T) { env.tmc.expectVRQuery(200, "update _vt.vreplication set state='Running' where db_name='vt_ks'", &sqltypes.Result{}) env.tmc.expectVRQuery(210, "update _vt.vreplication set state='Running' where db_name='vt_ks'", &sqltypes.Result{}) - err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true) + err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true, "", "") assert.NoError(t, err) env.tmc.verifyQueries(t) } @@ -101,7 +101,7 @@ func TestResharderManyToOne(t *testing.T) { env.tmc.expectVRQuery(200, "update _vt.vreplication set state='Running' where db_name='vt_ks'", &sqltypes.Result{}) - err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true) + err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true, "", "") assert.NoError(t, err) env.tmc.verifyQueries(t) } @@ -142,7 +142,7 @@ func TestResharderManyToMany(t *testing.T) { env.tmc.expectVRQuery(200, "update _vt.vreplication set state='Running' where db_name='vt_ks'", &sqltypes.Result{}) env.tmc.expectVRQuery(210, "update _vt.vreplication set state='Running' where db_name='vt_ks'", &sqltypes.Result{}) - err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true) + err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true, "", "") assert.NoError(t, err) env.tmc.verifyQueries(t) } @@ -195,7 +195,7 @@ func TestResharderOneRefTable(t *testing.T) { env.tmc.expectVRQuery(200, "update _vt.vreplication set state='Running' where db_name='vt_ks'", &sqltypes.Result{}) env.tmc.expectVRQuery(210, "update _vt.vreplication set state='Running' where db_name='vt_ks'", &sqltypes.Result{}) - err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true) + err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true, "", "") assert.NoError(t, err) env.tmc.verifyQueries(t) } @@ -263,7 +263,7 @@ func TestResharderOneRefStream(t *testing.T) { env.tmc.expectVRQuery(200, "update _vt.vreplication set state='Running' where db_name='vt_ks'", &sqltypes.Result{}) env.tmc.expectVRQuery(210, "update _vt.vreplication set state='Running' where db_name='vt_ks'", &sqltypes.Result{}) - err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true) + err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true, "", "") assert.NoError(t, err) env.tmc.verifyQueries(t) } @@ -340,7 +340,7 @@ func TestResharderNoRefStream(t *testing.T) { env.tmc.expectVRQuery(200, "update _vt.vreplication set state='Running' where db_name='vt_ks'", &sqltypes.Result{}) env.tmc.expectVRQuery(210, "update _vt.vreplication set state='Running' where db_name='vt_ks'", &sqltypes.Result{}) - err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true) + err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true, "", "") assert.NoError(t, err) env.tmc.verifyQueries(t) } @@ -384,7 +384,7 @@ func TestResharderCopySchema(t *testing.T) { env.tmc.expectVRQuery(200, "update _vt.vreplication set state='Running' where db_name='vt_ks'", &sqltypes.Result{}) env.tmc.expectVRQuery(210, "update _vt.vreplication set state='Running' where db_name='vt_ks'", &sqltypes.Result{}) - err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, false) + err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, false, "", "") assert.NoError(t, err) env.tmc.verifyQueries(t) } @@ -412,7 +412,7 @@ func TestResharderDupWorkflow(t *testing.T) { ) env.tmc.expectVRQuery(210, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s' and workflow='%s'", env.keyspace, env.workflow), result) - err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true) + err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true, "", "") assert.EqualError(t, err, "workflow resharderTest already exists in keyspace ks") env.tmc.verifyQueries(t) } @@ -434,19 +434,19 @@ func TestResharderServingState(t *testing.T) { env.tmc.expectVRQuery(100, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s' and workflow='%s'", env.keyspace, env.workflow), &sqltypes.Result{}) env.tmc.expectVRQuery(200, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s' and workflow='%s'", env.keyspace, env.workflow), &sqltypes.Result{}) env.tmc.expectVRQuery(210, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s' and workflow='%s'", env.keyspace, env.workflow), &sqltypes.Result{}) - err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, []string{"-80"}, nil, true) + err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, []string{"-80"}, nil, true, "", "") assert.EqualError(t, err, "buildResharder: source shard -80 is not in serving state") env.tmc.expectVRQuery(100, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s' and workflow='%s'", env.keyspace, env.workflow), &sqltypes.Result{}) env.tmc.expectVRQuery(200, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s' and workflow='%s'", env.keyspace, env.workflow), &sqltypes.Result{}) env.tmc.expectVRQuery(210, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s' and workflow='%s'", env.keyspace, env.workflow), &sqltypes.Result{}) - err = env.wr.Reshard(context.Background(), env.keyspace, env.workflow, []string{"0"}, []string{"0"}, true) + err = env.wr.Reshard(context.Background(), env.keyspace, env.workflow, []string{"0"}, []string{"0"}, true, "", "") assert.EqualError(t, err, "buildResharder: target shard 0 is in serving state") env.tmc.expectVRQuery(100, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s' and workflow='%s'", env.keyspace, env.workflow), &sqltypes.Result{}) env.tmc.expectVRQuery(200, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s' and workflow='%s'", env.keyspace, env.workflow), &sqltypes.Result{}) env.tmc.expectVRQuery(210, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s' and workflow='%s'", env.keyspace, env.workflow), &sqltypes.Result{}) - err = env.wr.Reshard(context.Background(), env.keyspace, env.workflow, []string{"0"}, []string{"-80"}, true) + err = env.wr.Reshard(context.Background(), env.keyspace, env.workflow, []string{"0"}, []string{"-80"}, true, "", "") assert.EqualError(t, err, "buildResharder: ValidateForReshard: source and target keyranges don't match: - vs -80") } @@ -476,7 +476,7 @@ func TestResharderTargetAlreadyResharding(t *testing.T) { env.tmc.expectVRQuery(200, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s'", env.keyspace), result) env.tmc.expectVRQuery(210, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s'", env.keyspace), &sqltypes.Result{}) - err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true) + err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true, "", "") assert.EqualError(t, err, "buildResharder: validateTargets: some streams already exist in the target shards, please clean them up and retry the command") env.tmc.verifyQueries(t) } @@ -524,7 +524,7 @@ func TestResharderUnnamedStream(t *testing.T) { ) env.tmc.expectVRQuery(100, fmt.Sprintf("select workflow, source, cell, tablet_types from _vt.vreplication where db_name='vt_%s'", env.keyspace), result) - err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true) + err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true, "", "") assert.EqualError(t, err, "buildResharder: readRefStreams: VReplication streams must have named workflows for migration: shard: ks:0") env.tmc.verifyQueries(t) } @@ -588,7 +588,7 @@ func TestResharderMismatchedRefStreams(t *testing.T) { ) env.tmc.expectVRQuery(110, fmt.Sprintf("select workflow, source, cell, tablet_types from _vt.vreplication where db_name='vt_%s'", env.keyspace), result2) - err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true) + err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true, "", "") want := "buildResharder: readRefStreams: streams are mismatched across source shards" if err == nil || !strings.HasPrefix(err.Error(), want) { t.Errorf("Reshard err: %v, want %v", err, want) @@ -628,7 +628,7 @@ func TestResharderTableNotInVSchema(t *testing.T) { ) env.tmc.expectVRQuery(100, fmt.Sprintf("select workflow, source, cell, tablet_types from _vt.vreplication where db_name='vt_%s'", env.keyspace), result) - err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true) + err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true, "", "") assert.EqualError(t, err, "buildResharder: readRefStreams: blsIsReference: table t1 not found in vschema") env.tmc.verifyQueries(t) } @@ -692,7 +692,7 @@ func TestResharderMixedTablesOrder1(t *testing.T) { ) env.tmc.expectVRQuery(100, fmt.Sprintf("select workflow, source, cell, tablet_types from _vt.vreplication where db_name='vt_%s'", env.keyspace), result) - err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true) + err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true, "", "") want := "buildResharder: readRefStreams: blsIsReference: cannot reshard streams with a mix of reference and sharded tables" if err == nil || !strings.HasPrefix(err.Error(), want) { t.Errorf("Reshard err: %v, want %v", err.Error(), want) @@ -759,7 +759,7 @@ func TestResharderMixedTablesOrder2(t *testing.T) { ) env.tmc.expectVRQuery(100, fmt.Sprintf("select workflow, source, cell, tablet_types from _vt.vreplication where db_name='vt_%s'", env.keyspace), result) - err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true) + err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true, "", "") want := "buildResharder: readRefStreams: blsIsReference: cannot reshard streams with a mix of reference and sharded tables" if err == nil || !strings.HasPrefix(err.Error(), want) { t.Errorf("Reshard err: %v, want %v", err.Error(), want) From d5192f1893482f96453bb8e5cef847f66291a4df Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Mon, 20 Jul 2020 14:13:43 +0200 Subject: [PATCH 2/4] VRepl Cross-cell source: add cell/tablet_types for reshard, rename cell to cells (breaking change), updated cross-cell tests to add reshard Signed-off-by: Rohit Nayak --- go/test/endtoend/vreplication/cluster.go | 28 ++-- .../vreplication/vreplication_test.go | 148 ++++++++++-------- go/vt/vtctl/vtctl.go | 12 +- go/vt/wrangler/schema.go | 2 +- 4 files changed, 109 insertions(+), 81 deletions(-) diff --git a/go/test/endtoend/vreplication/cluster.go b/go/test/endtoend/vreplication/cluster.go index fb45df780c6..cf4713cd36b 100644 --- a/go/test/endtoend/vreplication/cluster.go +++ b/go/test/endtoend/vreplication/cluster.go @@ -4,8 +4,10 @@ import ( "context" "errors" "fmt" + "math/rand" "os" "os/exec" + "path" "strings" "testing" "time" @@ -20,7 +22,8 @@ import ( ) var ( - vtdataroot string + originalVtdataroot string + vtdataroot string ) var globalConfig = struct { @@ -81,9 +84,20 @@ type Tablet struct { DbServer *cluster.MysqlctlProcess } +func init() { + originalVtdataroot = os.Getenv("VTDATAROOT") +} + func initGlobals() { - vtdataroot = os.Getenv("VTDATAROOT") + rand.Seed(time.Now().UTC().UnixNano()) + dirSuffix := 100000 + rand.Intn(999999-100000) // 6 digits + vtdataroot = path.Join(originalVtdataroot, fmt.Sprintf("vreple2e_%d", dirSuffix)) globalConfig.tmpDir = vtdataroot + "/tmp" + if _, err := os.Stat(vtdataroot); os.IsNotExist(err) { + os.Mkdir(vtdataroot, 0700) + } + _ = os.Setenv("VTDATAROOT", vtdataroot) + fmt.Printf("VTDATAROOT is %s\n", vtdataroot) } // NewVitessCluster creates an entire VitessCluster for e2e testing @@ -186,7 +200,7 @@ func (vc *VitessCluster) AddTablet(t *testing.T, cell *Cell, keyspace *Keyspace, vc.Topo.Port, globalConfig.hostname, globalConfig.tmpDir, - nil, + []string{"-queryserver-config-schema-reload-time", "5"}, //FIXME: for multi-cell initial schema doesn't seem to load without this false) assert.NotNil(t, vttablet) vttablet.SupportsBackup = false @@ -366,18 +380,12 @@ func (vc *VitessCluster) TearDown() { } } - for _, proc := range dbProcesses { - if err := proc.Wait(); err != nil { - fmt.Printf("Error waiting for mysql to stop: %s\n", err.Error()) - } - } - if err := vc.Vtctld.TearDown(); err != nil { fmt.Printf("Error stopping Vtctld: %s\n", err.Error()) } for _, cell := range vc.Cells { - if err := vc.Topo.TearDown(cell.Name, vtdataroot, vtdataroot, false, "etcd2"); err != nil { + if err := vc.Topo.TearDown(cell.Name, originalVtdataroot, vtdataroot, false, "etcd2"); err != nil { fmt.Printf("Error in etcd teardown - %s\n", err.Error()) } } diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index 47493e99785..db4e079d14a 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -35,10 +35,11 @@ import ( var ( vc *VitessCluster vtgate *cluster.VtgateProcess - cell *Cell + defaultCell *Cell vtgateConn *mysql.Conn defaultRdonly int defaultReplicas int + allCellNames string ) func init() { @@ -47,16 +48,16 @@ func init() { } func TestBasicVreplicationWorkflow(t *testing.T) { - cellName := "zone1" - - vc = InitCluster(t, []string{cellName}) + defaultCellName := "zone1" + allCellNames = "zone1" + vc = InitCluster(t, []string{defaultCellName}) assert.NotNil(t, vc) defer vc.TearDown() - cell = vc.Cells[cellName] - vc.AddKeyspace(t, []*Cell{cell}, "product", "0", initialProductVSchema, initialProductSchema, defaultReplicas, defaultRdonly, 100) - vtgate = cell.Vtgates[0] + defaultCell = vc.Cells[defaultCellName] + vc.AddKeyspace(t, []*Cell{defaultCell}, "product", "0", initialProductVSchema, initialProductSchema, defaultReplicas, defaultRdonly, 100) + vtgate = defaultCell.Vtgates[0] assert.NotNil(t, vtgate) vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.master", "product", "0"), 1) @@ -64,7 +65,7 @@ func TestBasicVreplicationWorkflow(t *testing.T) { defer vtgateConn.Close() verifyClusterHealth(t) insertInitialData(t) - shardCustomer(t, true, []*Cell{cell}, cellName) + shardCustomer(t, true, []*Cell{defaultCell}, defaultCellName) shardOrders(t) shardMerchant(t) @@ -77,7 +78,7 @@ func TestBasicVreplicationWorkflow(t *testing.T) { reshardMerchant3to1Merge(t) insertMoreCustomers(t, 16) - reshardCustomer2to4Split(t) + reshardCustomer2to4Split(t, nil, "") expectNumberOfStreams(t, vtgateConn, "Customer2to4", "sales", "product:0", 4) reshardCustomer3to2SplitMerge(t) expectNumberOfStreams(t, vtgateConn, "Customer3to2", "sales", "product:0", 3) @@ -87,9 +88,12 @@ func TestBasicVreplicationWorkflow(t *testing.T) { func TestMultiCellVreplicationWorkflow(t *testing.T) { cells := []string{"zone1", "zone2"} + allCellNames = "zone1,zone2" vc = InitCluster(t, cells) assert.NotNil(t, vc) + defaultCellName := "zone1" + defaultCell = vc.Cells[defaultCellName] defer vc.TearDown() @@ -100,16 +104,20 @@ func TestMultiCellVreplicationWorkflow(t *testing.T) { vtgate = cell1.Vtgates[0] assert.NotNil(t, vtgate) vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.master", "product", "0"), 1) + vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.replica", "product", "0"), 2) vtgateConn = getConnection(t, globalConfig.vtgateMySQLPort) defer vtgateConn.Close() verifyClusterHealth(t) insertInitialData(t) shardCustomer(t, true, []*Cell{cell1, cell2}, cell2.Name) - // TODO: Test resharding once -cells option is added to Reshard command - //insertMoreCustomers(t, 16) - //reshardCustomer2to4Split(t) - //expectNumberOfStreams(t, vtgateConn, "Customer2to4", "sales", "product:0", 4) + + insertMoreCustomers(t, 16) + reshardCustomer2to4Split(t, []*Cell{cell1, cell2}, cell2.Name) + validateCount(t, vtgateConn, "customer:-40", "customer", 5) + validateCount(t, vtgateConn, "customer:40-80", "customer", 5) + validateCount(t, vtgateConn, "customer:80-c0", "customer", 6) + validateCount(t, vtgateConn, "customer:c0-", "customer", 5) } func TestCellAliasVreplicationWorkflow(t *testing.T) { @@ -117,6 +125,9 @@ func TestCellAliasVreplicationWorkflow(t *testing.T) { vc = InitCluster(t, cells) assert.NotNil(t, vc) + allCellNames = "zone1,zone2" + defaultCellName := "zone1" + defaultCell = vc.Cells[defaultCellName] defer vc.TearDown() @@ -131,16 +142,20 @@ func TestCellAliasVreplicationWorkflow(t *testing.T) { vtgate = cell1.Vtgates[0] assert.NotNil(t, vtgate) vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.master", "product", "0"), 1) + vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.replica", "product", "0"), 2) vtgateConn = getConnection(t, globalConfig.vtgateMySQLPort) defer vtgateConn.Close() verifyClusterHealth(t) insertInitialData(t) shardCustomer(t, true, []*Cell{cell1, cell2}, "alias") - // TODO: Test resharding once -cells option is added to Reshard command - //insertMoreCustomers(t, 16) - //reshardCustomer2to4Split(t) - //expectNumberOfStreams(t, vtgateConn, "Customer2to4", "sales", "product:0", 4) + + insertMoreCustomers(t, 16) + reshardCustomer2to4Split(t, []*Cell{cell1, cell2}, "zone1") + validateCount(t, vtgateConn, "customer:-40", "customer", 5) + validateCount(t, vtgateConn, "customer:40-80", "customer", 5) + validateCount(t, vtgateConn, "customer:80-c0", "customer", 6) + validateCount(t, vtgateConn, "customer:c0-", "customer", 5) } func insertInitialData(t *testing.T) { @@ -181,16 +196,16 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl t.Fatal(err) } - if err := vc.VtctlClient.ExecuteCommand("MoveTables", "-cell="+sourceCellOrAlias, "-workflow=p2c", + if err := vc.VtctlClient.ExecuteCommand("MoveTables", "-cells="+sourceCellOrAlias, "-workflow=p2c", "-tablet_types="+"replica,rdonly", "product", "customer", "customer"); err != nil { t.Fatalf("MoveTables command failed with %+v\n", err) } // Assume we are operating on first cell - cell := cells[0] + defaultCell := cells[0] - customerTab1 := vc.Cells[cell.Name].Keyspaces["customer"].Shards["-80"].Tablets["zone1-200"].Vttablet - customerTab2 := vc.Cells[cell.Name].Keyspaces["customer"].Shards["80-"].Tablets["zone1-300"].Vttablet + customerTab1 := vc.Cells[defaultCell.Name].Keyspaces["customer"].Shards["-80"].Tablets["zone1-200"].Vttablet + customerTab2 := vc.Cells[defaultCell.Name].Keyspaces["customer"].Shards["80-"].Tablets["zone1-300"].Vttablet if vc.WaitForVReplicationToCatchup(customerTab1, "p2c", "vt_customer", 1*time.Second) != nil { t.Fatal("MoveTables timed out for customer.p2c -80") @@ -200,8 +215,8 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl t.Fatal("MoveTables timed out for customer.p2c 80-") } - productTab := vc.Cells[cell.Name].Keyspaces["product"].Shards["0"].Tablets["zone1-100"].Vttablet - productTabReplica := vc.Cells[cell.Name].Keyspaces["product"].Shards["0"].Tablets["zone1-101"].Vttablet + productTab := vc.Cells[defaultCell.Name].Keyspaces["product"].Shards["0"].Tablets["zone1-100"].Vttablet + productTabReplica := vc.Cells[defaultCell.Name].Keyspaces["product"].Shards["0"].Tablets["zone1-101"].Vttablet query := "select * from customer" assert.True(t, validateThatQueryExecutesOnTablet(t, vtgateConn, productTab, "product", query, query)) insertQuery1 := "insert into customer(cid, name) values(1001, 'tempCustomer1')" @@ -211,15 +226,15 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl var output string var err error - if output, err = vc.VtctlClient.ExecuteCommandWithOutput("SwitchReads", "-cells="+cell.Name, "-tablet_type=rdonly", "customer.p2c"); err != nil { + if output, err = vc.VtctlClient.ExecuteCommandWithOutput("SwitchReads", "-cells="+allCellNames, "-tablet_type=rdonly", "customer.p2c"); err != nil { t.Fatalf("SwitchReads error: %s\n", output) } want := dryRunResultsReadCustomerShard - if output, err = vc.VtctlClient.ExecuteCommandWithOutput("SwitchReads", "-cells="+cell.Name, "-tablet_type=replica", "-dry_run", "customer.p2c"); err != nil { + if output, err = vc.VtctlClient.ExecuteCommandWithOutput("SwitchReads", "-cells="+allCellNames, "-tablet_type=replica", "-dry_run", "customer.p2c"); err != nil { t.Fatalf("SwitchReads Dry Run error: %s\n", output) } validateDryRunResults(t, output, want) - if output, err = vc.VtctlClient.ExecuteCommandWithOutput("SwitchReads", "-cells="+cell.Name, "-tablet_type=replica", "customer.p2c"); err != nil { + if output, err = vc.VtctlClient.ExecuteCommandWithOutput("SwitchReads", "-cells="+allCellNames, "-tablet_type=replica", "customer.p2c"); err != nil { t.Fatalf("SwitchReads error: %s\n", output) } @@ -246,10 +261,10 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl if testReverse { //Reverse Replicate - if output, err := vc.VtctlClient.ExecuteCommandWithOutput("SwitchReads", "-cells="+cell.Name, "-tablet_type=rdonly", "product.p2c_reverse"); err != nil { + if output, err := vc.VtctlClient.ExecuteCommandWithOutput("SwitchReads", "-cells="+allCellNames, "-tablet_type=rdonly", "product.p2c_reverse"); err != nil { t.Fatalf("SwitchReads error: %s\n", output) } - if output, err := vc.VtctlClient.ExecuteCommandWithOutput("SwitchReads", "-cells="+cell.Name, "-tablet_type=replica", "product.p2c_reverse"); err != nil { + if output, err := vc.VtctlClient.ExecuteCommandWithOutput("SwitchReads", "-cells="+allCellNames, "-tablet_type=replica", "product.p2c_reverse"); err != nil { t.Fatalf("SwitchReads error: %s\n", output) } if output, err := vc.VtctlClient.ExecuteCommandWithOutput("SwitchWrites", "product.p2c_reverse"); err != nil { @@ -263,10 +278,10 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl assert.False(t, validateThatQueryExecutesOnTablet(t, vtgateConn, customerTab2, "customer", insertQuery1, matchInsertQuery1)) //Go forward again - if output, err := vc.VtctlClient.ExecuteCommandWithOutput("SwitchReads", "-cells="+cell.Name, "-tablet_type=rdonly", "customer.p2c"); err != nil { + if output, err := vc.VtctlClient.ExecuteCommandWithOutput("SwitchReads", "-cells="+allCellNames, "-tablet_type=rdonly", "customer.p2c"); err != nil { t.Fatalf("SwitchReads error: %s\n", output) } - if output, err := vc.VtctlClient.ExecuteCommandWithOutput("SwitchReads", "-cells="+cell.Name, "-tablet_type=replica", "customer.p2c"); err != nil { + if output, err := vc.VtctlClient.ExecuteCommandWithOutput("SwitchReads", "-cells="+allCellNames, "-tablet_type=replica", "customer.p2c"); err != nil { t.Fatalf("SwitchReads error: %s\n", output) } if output, err := vc.VtctlClient.ExecuteCommandWithOutput("SwitchWrites", "customer.p2c"); err != nil { @@ -326,7 +341,7 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl assert.Empty(t, validateCountInTablet(t, customerTab2, "customer", "customer", 2)) assert.Empty(t, validateCount(t, vtgateConn, "customer", "customer.customer", 3)) - query = "insert into customer (name) values('george')" + query = "insert into customer (name, cid) values('george', 5)" execVtgateQuery(t, vtgateConn, "customer", query) assert.Empty(t, validateCountInTablet(t, customerTab1, "customer", "customer", 1)) assert.Empty(t, validateCountInTablet(t, customerTab2, "customer", "customer", 3)) @@ -334,10 +349,10 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl } } -func reshardCustomer2to4Split(t *testing.T) { +func reshardCustomer2to4Split(t *testing.T, cells []*Cell, sourceCellOrAlias string) { ksName := "customer" counts := map[string]int{"zone1-600": 4, "zone1-700": 5, "zone1-800": 6, "zone1-900": 5} - reshard(t, ksName, "customer", "c2c4", "-80,80-", "-40,40-80,80-c0,c0-", 600, counts, nil) + reshard(t, ksName, "customer", "c2c4", "-80,80-", "-40,40-80,80-c0,c0-", 600, counts, nil, cells, sourceCellOrAlias) assert.Empty(t, validateCount(t, vtgateConn, ksName, "customer", 20)) query := "insert into customer (name) values('yoko')" execVtgateQuery(t, vtgateConn, ksName, query) @@ -347,7 +362,7 @@ func reshardCustomer2to4Split(t *testing.T) { func reshardMerchant2to3SplitMerge(t *testing.T) { ksName := "merchant" counts := map[string]int{"zone1-1600": 0, "zone1-1700": 2, "zone1-1800": 0} - reshard(t, ksName, "merchant", "m2m3", "-80,80-", "-40,40-c0,c0-", 1600, counts, dryrunresultsswitchwritesM2m3) + reshard(t, ksName, "merchant", "m2m3", "-80,80-", "-40,40-c0,c0-", 1600, counts, dryrunresultsswitchwritesM2m3, nil, "") assert.Empty(t, validateCount(t, vtgateConn, ksName, "merchant", 2)) query := "insert into merchant (mname, category) values('amazon', 'electronics')" execVtgateQuery(t, vtgateConn, ksName, query) @@ -390,7 +405,7 @@ func reshardMerchant2to3SplitMerge(t *testing.T) { func reshardMerchant3to1Merge(t *testing.T) { ksName := "merchant" counts := map[string]int{"zone1-2000": 3} - reshard(t, ksName, "merchant", "m3m1", "-40,40-c0,c0-", "0", 2000, counts, nil) + reshard(t, ksName, "merchant", "m3m1", "-40,40-c0,c0-", "0", 2000, counts, nil, nil, "") assert.Empty(t, validateCount(t, vtgateConn, ksName, "merchant", 3)) query := "insert into merchant (mname, category) values('flipkart', 'electronics')" execVtgateQuery(t, vtgateConn, ksName, query) @@ -400,35 +415,41 @@ func reshardMerchant3to1Merge(t *testing.T) { func reshardCustomer3to2SplitMerge(t *testing.T) { //-40,40-80,80-c0 => merge/split, c0- stays the same ending up with 3 ksName := "customer" counts := map[string]int{"zone1-1000": 7, "zone1-1100": 9, "zone1-1200": 5} - reshard(t, ksName, "customer", "c4c3", "-40,40-80,80-c0", "-60,60-c0", 1000, counts, nil) + reshard(t, ksName, "customer", "c4c3", "-40,40-80,80-c0", "-60,60-c0", 1000, counts, nil, nil, "") } func reshardCustomer3to1Merge(t *testing.T) { //to unsharded ksName := "customer" counts := map[string]int{"zone1-1500": 21} - reshard(t, ksName, "customer", "c3c1", "-60,60-c0,c0-", "0", 1500, counts, nil) + reshard(t, ksName, "customer", "c3c1", "-60,60-c0,c0-", "0", 1500, counts, nil, nil, "") } -func reshard(t *testing.T, ksName string, tableName string, workflow string, sourceShards string, targetShards string, tabletIDBase int, counts map[string]int, dryRunResultswitchWrites []string) { +func reshard(t *testing.T, ksName string, tableName string, workflow string, sourceShards string, targetShards string, tabletIDBase int, counts map[string]int, dryRunResultswitchWrites []string, cells []*Cell, sourceCellOrAlias string) { + if cells == nil { + cells = []*Cell{defaultCell} + } + if sourceCellOrAlias == "" { + sourceCellOrAlias = defaultCell.Name + } ksWorkflow := ksName + "." + workflow - keyspace := vc.Cells[cell.Name].Keyspaces[ksName] - require.NoError(t, vc.AddShards(t, []*Cell{cell}, keyspace, targetShards, defaultReplicas, defaultRdonly, tabletIDBase)) - arrShardNames := strings.Split(targetShards, ",") + keyspace := vc.Cells[defaultCell.Name].Keyspaces[ksName] + require.NoError(t, vc.AddShards(t, cells, keyspace, targetShards, defaultReplicas, defaultRdonly, tabletIDBase)) + arrTargetShardNames := strings.Split(targetShards, ",") - for _, shardName := range arrShardNames { + for _, shardName := range arrTargetShardNames { if err := vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.master", ksName, shardName), 1); err != nil { t.Fatal(err) } } - if err := vc.VtctlClient.ExecuteCommand("Reshard", ksWorkflow, sourceShards, targetShards); err != nil { + if err := vc.VtctlClient.ExecuteCommand("Reshard", "-cells="+sourceCellOrAlias, "-tablet_types=replica,master", ksWorkflow, sourceShards, targetShards); err != nil { t.Fatalf("Reshard command failed with %+v\n", err) } - tablets := vc.getVttabletsInKeyspace(t, cell, ksName, "master") + tablets := vc.getVttabletsInKeyspace(t, defaultCell, ksName, "master") targetShards = "," + targetShards + "," for _, tab := range tablets { if strings.Contains(targetShards, ","+tab.Shard+",") { fmt.Printf("Waiting for vrepl to catch up on %s since it IS a target shard\n", tab.Shard) - if vc.WaitForVReplicationToCatchup(tab, workflow, "vt_"+ksName, 3*time.Second) != nil { + if vc.WaitForVReplicationToCatchup(tab, workflow, "vt_"+ksName, 10*time.Second) != nil { t.Fatal("Reshard timed out") } } else { @@ -437,10 +458,10 @@ func reshard(t *testing.T, ksName string, tableName string, workflow string, sou } } vdiff(t, ksWorkflow) - if output, err := vc.VtctlClient.ExecuteCommandWithOutput("SwitchReads", "-cells="+cell.Name, "-tablet_type=rdonly", ksWorkflow); err != nil { + if output, err := vc.VtctlClient.ExecuteCommandWithOutput("SwitchReads", "-cells="+allCellNames, "-tablet_type=rdonly", ksWorkflow); err != nil { t.Fatalf("SwitchReads error: %s\n", output) } - if output, err := vc.VtctlClient.ExecuteCommandWithOutput("SwitchReads", "-cells="+cell.Name, "-tablet_type=replica", ksWorkflow); err != nil { + if output, err := vc.VtctlClient.ExecuteCommandWithOutput("SwitchReads", "-cells="+allCellNames, "-tablet_type=replica", ksWorkflow); err != nil { t.Fatalf("SwitchReads error: %s\n", output) } @@ -471,12 +492,12 @@ func shardOrders(t *testing.T) { if err := vc.VtctlClient.ExecuteCommand("ApplyVSchema", "-vschema", ordersVSchema, "customer"); err != nil { t.Fatal(err) } - if err := vc.VtctlClient.ExecuteCommand("MoveTables", "-cell="+cell.Name, "-workflow=o2c", + if err := vc.VtctlClient.ExecuteCommand("MoveTables", "-cells="+defaultCell.Name, "-workflow=o2c", "-tablet_types="+"replica,rdonly", "product", "customer", "orders"); err != nil { t.Fatal(err) } - customerTab1 := vc.Cells[cell.Name].Keyspaces["customer"].Shards["-80"].Tablets["zone1-200"].Vttablet - customerTab2 := vc.Cells[cell.Name].Keyspaces["customer"].Shards["80-"].Tablets["zone1-300"].Vttablet + customerTab1 := vc.Cells[defaultCell.Name].Keyspaces["customer"].Shards["-80"].Tablets["zone1-200"].Vttablet + customerTab2 := vc.Cells[defaultCell.Name].Keyspaces["customer"].Shards["80-"].Tablets["zone1-300"].Vttablet if vc.WaitForVReplicationToCatchup(customerTab1, "o2c", "vt_customer", 1*time.Second) != nil { assert.Fail(t, "MoveTables timed out for customer.o2c -80") @@ -486,10 +507,10 @@ func shardOrders(t *testing.T) { } vdiff(t, "customer.o2c") - if output, err := vc.VtctlClient.ExecuteCommandWithOutput("SwitchReads", "-cells="+cell.Name, "-tablet_type=rdonly", "customer.o2c"); err != nil { + if output, err := vc.VtctlClient.ExecuteCommandWithOutput("SwitchReads", "-cells="+allCellNames, "-tablet_type=rdonly", "customer.o2c"); err != nil { t.Fatalf("SwitchReads error: %s\n", output) } - if output, err := vc.VtctlClient.ExecuteCommandWithOutput("SwitchReads", "-cells="+cell.Name, "-tablet_type=replica", "customer.o2c"); err != nil { + if output, err := vc.VtctlClient.ExecuteCommandWithOutput("SwitchReads", "-cells="+allCellNames, "-tablet_type=replica", "customer.o2c"); err != nil { t.Fatalf("SwitchReads error: %s\n", output) } if output, err := vc.VtctlClient.ExecuteCommandWithOutput("SwitchWrites", "customer.o2c"); err != nil { @@ -505,7 +526,7 @@ func shardOrders(t *testing.T) { } func shardMerchant(t *testing.T) { - if _, err := vc.AddKeyspace(t, []*Cell{cell}, "merchant", "-80,80-", merchantVSchema, "", defaultReplicas, defaultRdonly, 400); err != nil { + if _, err := vc.AddKeyspace(t, []*Cell{defaultCell}, "merchant", "-80,80-", merchantVSchema, "", defaultReplicas, defaultRdonly, 400); err != nil { t.Fatal(err) } if err := vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.master", "merchant", "-80"), 1); err != nil { @@ -514,13 +535,13 @@ func shardMerchant(t *testing.T) { if err := vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.master", "merchant", "80-"), 1); err != nil { t.Fatal(err) } - if err := vc.VtctlClient.ExecuteCommand("MoveTables", "-cell="+cell.Name, "-workflow=p2m", + if err := vc.VtctlClient.ExecuteCommand("MoveTables", "-cells="+defaultCell.Name, "-workflow=p2m", "-tablet_types="+"replica,rdonly", "product", "merchant", "merchant"); err != nil { t.Fatal(err) } - merchantTab1 := vc.Cells[cell.Name].Keyspaces["merchant"].Shards["-80"].Tablets["zone1-400"].Vttablet - merchantTab2 := vc.Cells[cell.Name].Keyspaces["merchant"].Shards["80-"].Tablets["zone1-500"].Vttablet + merchantTab1 := vc.Cells[defaultCell.Name].Keyspaces["merchant"].Shards["-80"].Tablets["zone1-400"].Vttablet + merchantTab2 := vc.Cells[defaultCell.Name].Keyspaces["merchant"].Shards["80-"].Tablets["zone1-500"].Vttablet if vc.WaitForVReplicationToCatchup(merchantTab1, "p2m", "vt_merchant", 1*time.Second) != nil { t.Fatal("MoveTables timed out for merchant.p2m -80") @@ -530,10 +551,10 @@ func shardMerchant(t *testing.T) { } vdiff(t, "merchant.p2m") - if output, err := vc.VtctlClient.ExecuteCommandWithOutput("SwitchReads", "-cells="+cell.Name, "-tablet_type=rdonly", "merchant.p2m"); err != nil { + if output, err := vc.VtctlClient.ExecuteCommandWithOutput("SwitchReads", "-cells="+allCellNames, "-tablet_type=rdonly", "merchant.p2m"); err != nil { t.Fatalf("SwitchReads error: %s\n", output) } - if output, err := vc.VtctlClient.ExecuteCommandWithOutput("SwitchReads", "-cells="+cell.Name, "-tablet_type=replica", "merchant.p2m"); err != nil { + if output, err := vc.VtctlClient.ExecuteCommandWithOutput("SwitchReads", "-cells="+allCellNames, "-tablet_type=replica", "merchant.p2m"); err != nil { t.Fatalf("SwitchReads error: %s\n", output) } if output, err := vc.VtctlClient.ExecuteCommandWithOutput("SwitchWrites", "merchant.p2m"); err != nil { @@ -577,7 +598,7 @@ func materializeProduct(t *testing.T) { if err := vc.VtctlClient.ExecuteCommand("Materialize", materializeProductSpec); err != nil { t.Fatal(err) } - customerTablets := vc.getVttabletsInKeyspace(t, cell, "customer", "master") + customerTablets := vc.getVttabletsInKeyspace(t, defaultCell, "customer", "master") for _, tab := range customerTablets { if vc.WaitForVReplicationToCatchup(tab, workflow, "vt_customer", 3*time.Second) != nil { t.Fatal("Materialize timed out") @@ -595,10 +616,9 @@ func materializeSales(t *testing.T) { if err := vc.VtctlClient.ExecuteCommand("Materialize", materializeSalesSpec); err != nil { t.Fatal(err) } - productTab := vc.Cells[cell.Name].Keyspaces["product"].Shards["0"].Tablets["zone1-100"].Vttablet + productTab := vc.Cells[defaultCell.Name].Keyspaces["product"].Shards["0"].Tablets["zone1-100"].Vttablet if vc.WaitForVReplicationToCatchup(productTab, "sales", "vt_product", 3*time.Second) != nil { - assert.Fail(t, "Materialize timed out for product.sales -80") - + assert.Fail(t, "Materialize timed out for product.sales") } assert.Empty(t, validateCount(t, vtgateConn, "product", "sales", 2)) assert.Empty(t, validateQuery(t, vtgateConn, "product:0", "select kount, amount from sales", @@ -611,7 +631,7 @@ func materializeMerchantSales(t *testing.T) { fmt.Printf("Materialize MerchantSales error is %+v", output) t.Fatal(err) } - merchantTablets := vc.getVttabletsInKeyspace(t, cell, "merchant", "master") + merchantTablets := vc.getVttabletsInKeyspace(t, defaultCell, "merchant", "master") for _, tab := range merchantTablets { if vc.WaitForVReplicationToCatchup(tab, workflow, "vt_merchant", 1*time.Second) != nil { t.Fatal("Materialize timed out") @@ -632,7 +652,7 @@ func materializeMerchantOrders(t *testing.T) { fmt.Printf("MerchantOrders error is %+v", output) t.Fatal(err) } - merchantTablets := vc.getVttabletsInKeyspace(t, cell, "merchant", "master") + merchantTablets := vc.getVttabletsInKeyspace(t, defaultCell, "merchant", "master") for _, tab := range merchantTablets { if vc.WaitForVReplicationToCatchup(tab, workflow, "vt_merchant", 1*time.Second) != nil { t.Fatal("Materialize timed out") diff --git a/go/vt/vtctl/vtctl.go b/go/vt/vtctl/vtctl.go index f0bea2f3000..a21ebe2e5d5 100644 --- a/go/vt/vtctl/vtctl.go +++ b/go/vt/vtctl/vtctl.go @@ -310,10 +310,10 @@ var commands = []commandGroup{ "[-ping-tablets] ", "Validates that all nodes reachable from the specified keyspace are consistent."}, {"Reshard", commandReshard, - "[-skip_schema_copy] ", + "[-cells=] [-tablet_types=] [-skip_schema_copy] ", "Start a Resharding process. Example: Reshard ks.workflow001 '0' '-80,80-'"}, {"MoveTables", commandMoveTables, - "[-cell=] [-tablet_types=] -workflow= ", + "[-cells=] [-tablet_types=] -workflow= ", `Move table(s) to another keyspace, table_specs is a list of tables or the tables section of the vschema for the target keyspace. Example: '{"t1":{"column_vindexes": [{""column": "id1", "name": "hash"}]}, "t2":{"column_vindexes": [{""column": "id2", "name": "hash"}]}}`}, {"DropSources", commandDropSources, "[-dry_run] [-rename_tables] ", @@ -1851,7 +1851,7 @@ func commandValidateKeyspace(ctx context.Context, wr *wrangler.Wrangler, subFlag } func commandReshard(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error { - cell := subFlags.String("cell", "", "Cell to replicate from.") + cells := subFlags.String("cells", "", "Cells (comma-separated) to replicate from.") tabletTypes := subFlags.String("tablet_types", "", "Source tablet types to replicate from.") skipSchemaCopy := subFlags.Bool("skip_schema_copy", false, "Skip copying of schema to targets") if err := subFlags.Parse(args); err != nil { @@ -1866,12 +1866,12 @@ func commandReshard(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.F } source := strings.Split(subFlags.Arg(1), ",") target := strings.Split(subFlags.Arg(2), ",") - return wr.Reshard(ctx, keyspace, workflow, source, target, *skipSchemaCopy, *cell, *tabletTypes) + return wr.Reshard(ctx, keyspace, workflow, source, target, *skipSchemaCopy, *cells, *tabletTypes) } func commandMoveTables(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error { workflow := subFlags.String("workflow", "", "Workflow name. Will be used to later migrate traffic.") - cell := subFlags.String("cell", "", "Cell to replicate from.") + cells := subFlags.String("cells", "", "Cell (comma-separated) to replicate from.") tabletTypes := subFlags.String("tablet_types", "", "Source tablet types to replicate from.") if err := subFlags.Parse(args); err != nil { return err @@ -1885,7 +1885,7 @@ func commandMoveTables(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla source := subFlags.Arg(0) target := subFlags.Arg(1) tableSpecs := subFlags.Arg(2) - return wr.MoveTables(ctx, *workflow, source, target, tableSpecs, *cell, *tabletTypes) + return wr.MoveTables(ctx, *workflow, source, target, tableSpecs, *cells, *tabletTypes) } func commandCreateLookupVindex(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error { diff --git a/go/vt/wrangler/schema.go b/go/vt/wrangler/schema.go index f1c7b118711..12464d7297a 100644 --- a/go/vt/wrangler/schema.go +++ b/go/vt/wrangler/schema.go @@ -280,7 +280,7 @@ func (wr *Wrangler) CopySchemaShardFromShard(ctx context.Context, tables, exclud // CopySchemaShard copies the schema from a source tablet to the // specified shard. The schema is applied directly on the master of -// the destination shard, and is propogated to the replicas through +// the destination shard, and is propagated to the replicas through // binlogs. func (wr *Wrangler) CopySchemaShard(ctx context.Context, sourceTabletAlias *topodatapb.TabletAlias, tables, excludeTables []string, includeViews bool, destKeyspace, destShard string, waitReplicasTimeout time.Duration, skipVerify bool) error { destShardInfo, err := wr.ts.GetShard(ctx, destKeyspace, destShard) From ac67c7b9c2a5ec01c636804d0bb30bc82252d366 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Mon, 20 Jul 2020 14:50:26 +0200 Subject: [PATCH 3/4] Vrepl Crosscell: fix linter error, remove unused code Signed-off-by: Rohit Nayak --- go/test/endtoend/vreplication/cluster.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/go/test/endtoend/vreplication/cluster.go b/go/test/endtoend/vreplication/cluster.go index cf4713cd36b..89198a2281b 100644 --- a/go/test/endtoend/vreplication/cluster.go +++ b/go/test/endtoend/vreplication/cluster.go @@ -359,16 +359,13 @@ func (vc *VitessCluster) TearDown() { } } } - var dbProcesses []*exec.Cmd for _, cell := range vc.Cells { for _, keyspace := range cell.Keyspaces { for _, shard := range keyspace.Shards { for _, tablet := range shard.Tablets { if tablet.DbServer != nil && tablet.DbServer.TabletUID > 0 { - if proc, err := tablet.DbServer.StopProcess(); err != nil { + if _, err := tablet.DbServer.StopProcess(); err != nil { log.Errorf("Error stopping mysql process: %s", err.Error()) - } else { - dbProcesses = append(dbProcesses, proc) } } fmt.Printf("Stopping vttablet %s\n", tablet.Name) From 2513e1a857073aba2c367af18c6fc29c1e7479cc Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Wed, 22 Jul 2020 13:38:20 +0200 Subject: [PATCH 4/4] VRepl Crosscell: address review comments Signed-off-by: Rohit Nayak --- .../vreplication/vreplication_test.go | 2 +- go/vt/vtctl/vtctl.go | 6 +- go/vt/wrangler/resharder.go | 2 +- go/vt/wrangler/resharder_test.go | 77 ++++++++++++++----- 4 files changed, 63 insertions(+), 24 deletions(-) diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index db4e079d14a..b54caf84463 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -151,7 +151,7 @@ func TestCellAliasVreplicationWorkflow(t *testing.T) { shardCustomer(t, true, []*Cell{cell1, cell2}, "alias") insertMoreCustomers(t, 16) - reshardCustomer2to4Split(t, []*Cell{cell1, cell2}, "zone1") + reshardCustomer2to4Split(t, []*Cell{cell1, cell2}, "alias") validateCount(t, vtgateConn, "customer:-40", "customer", 5) validateCount(t, vtgateConn, "customer:40-80", "customer", 5) validateCount(t, vtgateConn, "customer:80-c0", "customer", 6) diff --git a/go/vt/vtctl/vtctl.go b/go/vt/vtctl/vtctl.go index a21ebe2e5d5..9df6ca065bc 100644 --- a/go/vt/vtctl/vtctl.go +++ b/go/vt/vtctl/vtctl.go @@ -311,7 +311,7 @@ var commands = []commandGroup{ "Validates that all nodes reachable from the specified keyspace are consistent."}, {"Reshard", commandReshard, "[-cells=] [-tablet_types=] [-skip_schema_copy] ", - "Start a Resharding process. Example: Reshard ks.workflow001 '0' '-80,80-'"}, + "Start a Resharding process. Example: Reshard -cells 'zone1,alias1' -tablet_types='master,replica,rdonly' ks.workflow001 '0' '-80,80-'"}, {"MoveTables", commandMoveTables, "[-cells=] [-tablet_types=] -workflow= ", `Move table(s) to another keyspace, table_specs is a list of tables or the tables section of the vschema for the target keyspace. Example: '{"t1":{"column_vindexes": [{""column": "id1", "name": "hash"}]}, "t2":{"column_vindexes": [{""column": "id2", "name": "hash"}]}}`}, @@ -1851,7 +1851,7 @@ func commandValidateKeyspace(ctx context.Context, wr *wrangler.Wrangler, subFlag } func commandReshard(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error { - cells := subFlags.String("cells", "", "Cells (comma-separated) to replicate from.") + cells := subFlags.String("cells", "", "Cell(s) or CellAlias(es) (comma-separated) to replicate from.") tabletTypes := subFlags.String("tablet_types", "", "Source tablet types to replicate from.") skipSchemaCopy := subFlags.Bool("skip_schema_copy", false, "Skip copying of schema to targets") if err := subFlags.Parse(args); err != nil { @@ -1871,7 +1871,7 @@ func commandReshard(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.F func commandMoveTables(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error { workflow := subFlags.String("workflow", "", "Workflow name. Will be used to later migrate traffic.") - cells := subFlags.String("cells", "", "Cell (comma-separated) to replicate from.") + cells := subFlags.String("cells", "", "Cell(s) or CellAlias(es) (comma-separated) to replicate from.") tabletTypes := subFlags.String("tablet_types", "", "Source tablet types to replicate from.") if err := subFlags.Parse(args); err != nil { return err diff --git a/go/vt/wrangler/resharder.go b/go/vt/wrangler/resharder.go index bc12ef570ef..131f2fadc63 100644 --- a/go/vt/wrangler/resharder.go +++ b/go/vt/wrangler/resharder.go @@ -47,7 +47,7 @@ type resharder struct { targetMasters map[string]*topo.TabletInfo vschema *vschemapb.Keyspace refStreams map[string]*refStream - cell string + cell string //single cell or cellsAlias or comma-separated list of cells/cellsAliases tabletTypes string } diff --git a/go/vt/wrangler/resharder_test.go b/go/vt/wrangler/resharder_test.go index 21eab01c2cd..f6ee5d7c5b8 100644 --- a/go/vt/wrangler/resharder_test.go +++ b/go/vt/wrangler/resharder_test.go @@ -21,6 +21,8 @@ import ( "strings" "testing" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/assert" "golang.org/x/net/context" "vitess.io/vitess/go/sqltypes" @@ -50,27 +52,64 @@ func TestResharderOneToMany(t *testing.T) { env.expectValidation() env.expectNoRefStream() - env.tmc.expectVRQuery( - 200, - insertPrefix+ - `\('resharderTest', 'keyspace:\\"ks\\" shard:\\"0\\" filter: > ', '', [0-9]*, [0-9]*, '', '', [0-9]*, 0, 'Stopped', 'vt_ks'\)`+ - eol, - &sqltypes.Result{}, - ) - env.tmc.expectVRQuery( - 210, - insertPrefix+ - `\('resharderTest', 'keyspace:\\"ks\\" shard:\\"0\\" filter: > ', '', [0-9]*, [0-9]*, '', '', [0-9]*, 0, 'Stopped', 'vt_ks'\)`+ - eol, - &sqltypes.Result{}, - ) + type testCase struct { + cells string + tabletTypes string + } + var newTestCase = func(cells, tabletTypes string) *testCase { + return &testCase{ + cells: cells, + tabletTypes: tabletTypes, + } + } + var testCases []*testCase - env.tmc.expectVRQuery(200, "update _vt.vreplication set state='Running' where db_name='vt_ks'", &sqltypes.Result{}) - env.tmc.expectVRQuery(210, "update _vt.vreplication set state='Running' where db_name='vt_ks'", &sqltypes.Result{}) + testCases = append(testCases, newTestCase("", "")) + testCases = append(testCases, newTestCase("cell", "master")) + testCases = append(testCases, newTestCase("cell", "master,replica")) + testCases = append(testCases, newTestCase("", "replica,rdonly")) + + for _, tc := range testCases { + env := newTestResharderEnv([]string{"0"}, []string{"-80", "80-"}) + + schm := &tabletmanagerdatapb.SchemaDefinition{ + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{ + Name: "t1", + Columns: []string{"c1", "c2"}, + PrimaryKeyColumns: []string{"c1"}, + Fields: sqltypes.MakeTestFields("c1|c2", "int64|int64"), + }}, + } + env.tmc.schema = schm + + env.expectValidation() + env.expectNoRefStream() + name := tc.cells + "/" + tc.tabletTypes + t.Run(name, func(t *testing.T) { + env.tmc.expectVRQuery( + 200, + insertPrefix+ + `\('resharderTest', 'keyspace:\\"ks\\" shard:\\"0\\" filter: > ', '', [0-9]*, [0-9]*, '`+ + tc.cells+`', '`+tc.tabletTypes+`', [0-9]*, 0, 'Stopped', 'vt_ks'\)`+eol, + &sqltypes.Result{}, + ) + env.tmc.expectVRQuery( + 210, + insertPrefix+ + `\('resharderTest', 'keyspace:\\"ks\\" shard:\\"0\\" filter: > ', '', [0-9]*, [0-9]*, '`+ + tc.cells+`', '`+tc.tabletTypes+`', [0-9]*, 0, 'Stopped', 'vt_ks'\)`+eol, + &sqltypes.Result{}, + ) + env.tmc.expectVRQuery(200, "update _vt.vreplication set state='Running' where db_name='vt_ks'", &sqltypes.Result{}) + env.tmc.expectVRQuery(210, "update _vt.vreplication set state='Running' where db_name='vt_ks'", &sqltypes.Result{}) + + err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true, tc.cells, tc.tabletTypes) + require.NoError(t, err) + env.tmc.verifyQueries(t) + }) + env.close() + } - err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true, "", "") - assert.NoError(t, err) - env.tmc.verifyQueries(t) } func TestResharderManyToOne(t *testing.T) {