From e095c71d09a93a54164bbeb478703c06741b7b4c Mon Sep 17 00:00:00 2001 From: Sugu Sougoumarane Date: Fri, 25 Oct 2019 23:08:17 -0700 Subject: [PATCH 01/10] vreplication: Reshard initial cut Signed-off-by: Sugu Sougoumarane --- go/vt/key/key.go | 15 +++++ go/vt/key/key_test.go | 101 ++++++++++++++++++++++++++++ go/vt/topotools/split.go | 52 ++++++++++++++ go/vt/topotools/split_test.go | 59 ++++++++++++++++ go/vt/vtctl/vtctl.go | 19 ++++++ go/vt/wrangler/keyspace.go | 67 ++++++++++++++++++ go/vt/wrangler/migrater_env_test.go | 12 ---- 7 files changed, 313 insertions(+), 12 deletions(-) diff --git a/go/vt/key/key.go b/go/vt/key/key.go index b9ae9698403..0bd7f137a34 100644 --- a/go/vt/key/key.go +++ b/go/vt/key/key.go @@ -120,6 +120,21 @@ func EvenShardsKeyRange(i, n int) (*topodatapb.KeyRange, error) { return &topodatapb.KeyRange{Start: startBytes, End: endBytes}, nil } +// KeyRangeAdd adds two adjacent keyranges into a single value. +// If the values are not adjacent, it returns false. +func KeyRangeAdd(first, second *topodatapb.KeyRange) (*topodatapb.KeyRange, bool) { + if first == nil || second == nil { + return nil, false + } + if len(first.End) != 0 && bytes.Equal(first.End, second.Start) { + return &topodatapb.KeyRange{Start: first.Start, End: second.End}, true + } + if len(second.End) != 0 && bytes.Equal(second.End, first.Start) { + return &topodatapb.KeyRange{Start: second.Start, End: first.End}, true + } + return nil, false +} + // KeyRangeContains returns true if the provided id is in the keyrange. func KeyRangeContains(kr *topodatapb.KeyRange, id []byte) bool { if kr == nil { diff --git a/go/vt/key/key_test.go b/go/vt/key/key_test.go index 9d88ba19ea3..89bac6311fc 100644 --- a/go/vt/key/key_test.go +++ b/go/vt/key/key_test.go @@ -139,6 +139,107 @@ func TestEvenShardsKeyRange(t *testing.T) { } } +func TestKeyRangeAdd(t *testing.T) { + testcases := []struct { + first string + second string + out string + ok bool + }{{ + first: "", + second: "", + out: "", + ok: false, + }, { + first: "", + second: "-80", + out: "", + ok: false, + }, { + first: "-80", + second: "", + out: "", + ok: false, + }, { + first: "", + second: "80-", + out: "", + ok: false, + }, { + first: "80-", + second: "", + out: "", + ok: false, + }, { + first: "80-", + second: "-40", + out: "", + ok: false, + }, { + first: "-40", + second: "80-", + out: "", + ok: false, + }, { + first: "-80", + second: "80-", + out: "-", + ok: true, + }, { + first: "80-", + second: "-80", + out: "-", + ok: true, + }, { + first: "-40", + second: "40-80", + out: "-80", + ok: true, + }, { + first: "40-80", + second: "-40", + out: "-80", + ok: true, + }, { + first: "40-80", + second: "80-c0", + out: "40-c0", + ok: true, + }, { + first: "80-c0", + second: "40-80", + out: "40-c0", + ok: true, + }} + stringToKeyRange := func(spec string) *topodatapb.KeyRange { + if spec == "" { + return nil + } + parts := strings.Split(spec, "-") + if len(parts) != 2 { + panic("invalid spec") + } + kr, err := ParseKeyRangeParts(parts[0], parts[1]) + if err != nil { + panic(err) + } + return kr + } + keyRangeToString := func(kr *topodatapb.KeyRange) string { + if kr == nil { + return "" + } + return KeyRangeString(kr) + } + for _, tcase := range testcases { + first := stringToKeyRange(tcase.first) + second := stringToKeyRange(tcase.second) + out, ok := KeyRangeAdd(first, second) + assert.Equal(t, tcase.out, keyRangeToString(out)) + assert.Equal(t, tcase.ok, ok) + } +} + func TestEvenShardsKeyRange_Error(t *testing.T) { testCases := []struct { i, n int diff --git a/go/vt/topotools/split.go b/go/vt/topotools/split.go index 98c93e0eb6b..04431ddd699 100644 --- a/go/vt/topotools/split.go +++ b/go/vt/topotools/split.go @@ -17,14 +17,66 @@ limitations under the License. package topotools import ( + "errors" "fmt" "sort" "golang.org/x/net/context" "vitess.io/vitess/go/vt/key" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/topo" ) +// ValidateForReshard returns an error if sourceShards cannot reshard into +// targetShards. +func ValidateForReshard(sourceShards, targetShards []*topo.ShardInfo) error { + for _, source := range sourceShards { + for _, target := range targetShards { + if key.KeyRangeEqual(source.KeyRange, target.KeyRange) { + return fmt.Errorf("same keyrange is present in source and target: %v", key.KeyRangeString(source.KeyRange)) + } + } + } + sourcekr, err := combineKeyRanges(sourceShards) + if err != nil { + return err + } + targetkr, err := combineKeyRanges(targetShards) + if err != nil { + return err + } + if !key.KeyRangeEqual(sourcekr, targetkr) { + return fmt.Errorf("source and target keyranges don't match: %v vs %v", key.KeyRangeString(sourcekr), key.KeyRangeString(targetkr)) + } + return nil +} + +func combineKeyRanges(shards []*topo.ShardInfo) (*topodatapb.KeyRange, error) { + if len(shards) == 0 { + return nil, fmt.Errorf("there are no shards to combine") + } + result := shards[0].KeyRange + krmap := make(map[string]*topodatapb.KeyRange) + for _, si := range shards[1:] { + krmap[si.ShardName()] = si.KeyRange + } + for len(krmap) != 0 { + foundOne := false + for k, kr := range krmap { + newkr, ok := key.KeyRangeAdd(result, kr) + if ok { + foundOne = true + result = newkr + delete(krmap, k) + } + } + if !foundOne { + return nil, errors.New("shards don't form a contiguous keyrange") + } + } + return result, nil +} + // OverlappingShards contains sets of shards that overlap which each-other. // With this library, there is no guarantee of which set will be left or right. type OverlappingShards struct { diff --git a/go/vt/topotools/split_test.go b/go/vt/topotools/split_test.go index 7ee34a15946..5dbbc0f686a 100644 --- a/go/vt/topotools/split_test.go +++ b/go/vt/topotools/split_test.go @@ -20,6 +20,7 @@ import ( "encoding/hex" "testing" + "github.com/stretchr/testify/assert" "vitess.io/vitess/go/vt/topo" topodatapb "vitess.io/vitess/go/vt/proto/topodata" @@ -94,6 +95,64 @@ func compareResultLists(t *testing.T, os []*OverlappingShards, expected []expect } } +func TestValidateForReshard(t *testing.T) { + testcases := []struct { + sources []string + targets []string + out string + }{{ + sources: []string{"-80", "80-"}, + targets: []string{"-40", "40-"}, + out: "", + }, { + sources: []string{"80-", "-80"}, + targets: []string{"-40", "40-"}, + out: "", + }, { + sources: []string{"-40", "40-80", "80-"}, + targets: []string{"-30", "30-"}, + out: "", + }, { + sources: []string{"0"}, + targets: []string{"-40", "40-"}, + out: "", + }, { + sources: []string{"-40", "40-80", "80-"}, + targets: []string{"-40", "40-"}, + out: "same keyrange is present in source and target: -40", + }, { + sources: []string{"-30", "30-80"}, + targets: []string{"-40", "40-"}, + out: "source and target keyranges don't match: -80 vs -", + }, { + sources: []string{"-30", "20-80"}, + targets: []string{"-40", "40-"}, + out: "shards don't form a contiguous keyrange", + }} + buildShards := func(shards []string) []*topo.ShardInfo { + sis := make([]*topo.ShardInfo, 0, len(shards)) + for _, shard := range shards { + _, kr, err := topo.ValidateShardName(shard) + if err != nil { + panic(err) + } + sis = append(sis, topo.NewShardInfo("", shard, &topodatapb.Shard{KeyRange: kr}, nil)) + } + return sis + } + + for _, tcase := range testcases { + sources := buildShards(tcase.sources) + targets := buildShards(tcase.targets) + err := ValidateForReshard(sources, targets) + if tcase.out == "" { + assert.NoError(t, err) + } else { + assert.EqualError(t, err, tcase.out) + } + } +} + func TestFindOverlappingShardsNoOverlap(t *testing.T) { var shardMap map[string]*topo.ShardInfo var os []*OverlappingShards diff --git a/go/vt/vtctl/vtctl.go b/go/vt/vtctl/vtctl.go index 0048d12fc30..0ea26d69c76 100644 --- a/go/vt/vtctl/vtctl.go +++ b/go/vt/vtctl/vtctl.go @@ -307,6 +307,9 @@ var commands = []commandGroup{ {"ValidateKeyspace", commandValidateKeyspace, "[-ping-tablets] ", "Validates that all nodes reachable from the specified keyspace are consistent."}, + {"Reshard", commandReshard, + " ", + "Start a Resharding process. Example: Reshard ks.workflow001 '0' '-80,80-'"}, {"SplitClone", commandSplitClone, " ", "Start the SplitClone process to perform horizontal resharding. Example: SplitClone ks '0' '-80,80-'"}, @@ -1784,6 +1787,22 @@ func commandValidateKeyspace(ctx context.Context, wr *wrangler.Wrangler, subFlag return wr.ValidateKeyspace(ctx, keyspace, *pingTablets) } +func commandReshard(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error { + if err := subFlags.Parse(args); err != nil { + return err + } + if subFlags.NArg() != 3 { + return fmt.Errorf("three arguments are required: , source_shards, target_shards") + } + keyspace, workflow, err := splitKeyspaceWorkflow(subFlags.Arg(0)) + if err != nil { + return err + } + source := strings.Split(subFlags.Arg(1), ",") + target := strings.Split(subFlags.Arg(2), ",") + return wr.Reshard(ctx, workflow, keyspace, source, target) +} + func commandSplitClone(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error { if err := subFlags.Parse(args); err != nil { return err diff --git a/go/vt/wrangler/keyspace.go b/go/vt/wrangler/keyspace.go index 25e266df0dd..313f8301b5a 100644 --- a/go/vt/wrangler/keyspace.go +++ b/go/vt/wrangler/keyspace.go @@ -38,6 +38,7 @@ import ( "vitess.io/vitess/go/vt/topotools" "vitess.io/vitess/go/vt/topotools/events" "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vtgate/vindexes" ) const ( @@ -90,6 +91,72 @@ func (wr *Wrangler) SetKeyspaceShardingInfo(ctx context.Context, keyspace, shard return wr.ts.UpdateKeyspace(ctx, ki) } +// Reshard initiates a resharding workflow. +func (wr *Wrangler) Reshard(ctx context.Context, workflow, keyspace string, sources, targets []string) error { + var sourceShards, targetShards []*topo.ShardInfo + for _, shard := range sources { + si, err := wr.ts.GetShard(ctx, keyspace, shard) + if err != nil { + return vterrors.Wrapf(err, "GetShard(%s) failed", shard) + } + sourceShards = append(sourceShards, si) + } + for _, shard := range targets { + si, err := wr.ts.GetShard(ctx, keyspace, shard) + if err != nil { + return vterrors.Wrapf(err, "GetShard(%s) failed", shard) + } + targetShards = append(targetShards, si) + } + if err := topotools.ValidateForReshard(sourceShards, targetShards); err != nil { + return err + } + + // Exclude all reference tables. + vschema, err := wr.ts.GetVSchema(ctx, keyspace) + if err != nil { + return err + } + var excludeRules []*binlogdatapb.Rule + for tableName, ti := range vschema.Tables { + if ti.Type == vindexes.TypeReference { + excludeRules = append(excludeRules, &binlogdatapb.Rule{ + Match: tableName, + Filter: "exclude", + }) + } + } + + for _, dest := range targetShards { + master, err := wr.ts.GetTablet(ctx, dest.MasterAlias) + if err != nil { + return vterrors.Wrapf(err, "GetTablet(%v) failed", dest.MasterAlias) + } + for _, source := range sourceShards { + if !key.KeyRangesIntersect(dest.KeyRange, source.KeyRange) { + continue + } + filter := &binlogdatapb.Filter{ + Rules: append(excludeRules, &binlogdatapb.Rule{ + Match: "/.*", + Filter: key.KeyRangeString(dest.KeyRange), + }), + } + bls := &binlogdatapb.BinlogSource{ + Keyspace: keyspace, + Shard: source.ShardName(), + Filter: filter, + } + // TODO(sougou): do this in two phases. + cmd := binlogplayer.CreateVReplicationState(workflow, bls, "", binlogplayer.BlpRunning, master.DbName()) + if _, err := wr.TabletManagerClient().VReplicationExec(ctx, master.Tablet, cmd); err != nil { + return vterrors.Wrapf(err, "VReplicationExec(%v, %s) failed", dest.MasterAlias, cmd) + } + } + } + return wr.refreshMasters(ctx, targetShards) +} + // SplitClone initiates a SplitClone workflow. func (wr *Wrangler) SplitClone(ctx context.Context, keyspace string, from, to []string) error { var fromShards, toShards []*topo.ShardInfo diff --git a/go/vt/wrangler/migrater_env_test.go b/go/vt/wrangler/migrater_env_test.go index 5ead8aea8cb..81cdc7a6a1e 100644 --- a/go/vt/wrangler/migrater_env_test.go +++ b/go/vt/wrangler/migrater_env_test.go @@ -85,9 +85,6 @@ func newTestTableMigraterCustom(ctx context.Context, t *testing.T, sourceShards, if err != nil { t.Fatal(err) } - if sourceKeyRange == nil { - sourceKeyRange = &topodatapb.KeyRange{} - } tme.sourceKeyRanges = append(tme.sourceKeyRanges, sourceKeyRange) } for _, shard := range targetShards { @@ -98,9 +95,6 @@ func newTestTableMigraterCustom(ctx context.Context, t *testing.T, sourceShards, if err != nil { t.Fatal(err) } - if targetKeyRange == nil { - targetKeyRange = &topodatapb.KeyRange{} - } tme.targetKeyRanges = append(tme.targetKeyRanges, targetKeyRange) } @@ -209,9 +203,6 @@ func newTestShardMigrater(ctx context.Context, t *testing.T, sourceShards, targe if err != nil { t.Fatal(err) } - if sourceKeyRange == nil { - sourceKeyRange = &topodatapb.KeyRange{} - } tme.sourceKeyRanges = append(tme.sourceKeyRanges, sourceKeyRange) } for _, shard := range targetShards { @@ -222,9 +213,6 @@ func newTestShardMigrater(ctx context.Context, t *testing.T, sourceShards, targe if err != nil { t.Fatal(err) } - if targetKeyRange == nil { - targetKeyRange = &topodatapb.KeyRange{} - } tme.targetKeyRanges = append(tme.targetKeyRanges, targetKeyRange) } From 2d68a5bc5977b99c07ea9092eb1a361fa8e8e57d Mon Sep 17 00:00:00 2001 From: Sugu Sougoumarane Date: Sat, 26 Oct 2019 13:09:49 -0700 Subject: [PATCH 02/10] vreplication: move resharder to its own file Signed-off-by: Sugu Sougoumarane --- go.mod | 1 + go/vt/vtctl/vtctl.go | 2 +- go/vt/wrangler/keyspace.go | 83 +++------ go/vt/wrangler/resharder.go | 335 ++++++++++++++++++++++++++++++++++++ 4 files changed, 363 insertions(+), 58 deletions(-) create mode 100644 go/vt/wrangler/resharder.go diff --git a/go.mod b/go.mod index 6707d58528a..6308bc259ca 100644 --- a/go.mod +++ b/go.mod @@ -53,6 +53,7 @@ require ( github.com/opentracing-contrib/go-grpc v0.0.0-20180928155321-4b5a12d3ff02 github.com/opentracing/opentracing-go v1.1.0 github.com/pborman/uuid v0.0.0-20160824210600-b984ec7fa9ff + github.com/pkg/errors v0.8.1 github.com/prometheus/client_golang v1.1.0 github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4 // indirect github.com/prometheus/common v0.7.0 // indirect diff --git a/go/vt/vtctl/vtctl.go b/go/vt/vtctl/vtctl.go index 0ea26d69c76..4b2052130d1 100644 --- a/go/vt/vtctl/vtctl.go +++ b/go/vt/vtctl/vtctl.go @@ -1800,7 +1800,7 @@ func commandReshard(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.F } source := strings.Split(subFlags.Arg(1), ",") target := strings.Split(subFlags.Arg(2), ",") - return wr.Reshard(ctx, workflow, keyspace, source, target) + return wr.Reshard(ctx, keyspace, workflow, source, target) } func commandSplitClone(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error { diff --git a/go/vt/wrangler/keyspace.go b/go/vt/wrangler/keyspace.go index 313f8301b5a..04dd6bdbaef 100644 --- a/go/vt/wrangler/keyspace.go +++ b/go/vt/wrangler/keyspace.go @@ -38,7 +38,6 @@ import ( "vitess.io/vitess/go/vt/topotools" "vitess.io/vitess/go/vt/topotools/events" "vitess.io/vitess/go/vt/vterrors" - "vitess.io/vitess/go/vt/vtgate/vindexes" ) const ( @@ -91,70 +90,40 @@ func (wr *Wrangler) SetKeyspaceShardingInfo(ctx context.Context, keyspace, shard return wr.ts.UpdateKeyspace(ctx, ki) } -// Reshard initiates a resharding workflow. -func (wr *Wrangler) Reshard(ctx context.Context, workflow, keyspace string, sources, targets []string) error { - var sourceShards, targetShards []*topo.ShardInfo - for _, shard := range sources { - si, err := wr.ts.GetShard(ctx, keyspace, shard) - if err != nil { - return vterrors.Wrapf(err, "GetShard(%s) failed", shard) - } - sourceShards = append(sourceShards, si) - } - for _, shard := range targets { - si, err := wr.ts.GetShard(ctx, keyspace, shard) - if err != nil { - return vterrors.Wrapf(err, "GetShard(%s) failed", shard) - } - targetShards = append(targetShards, si) - } - if err := topotools.ValidateForReshard(sourceShards, targetShards); err != nil { - return err - } - - // Exclude all reference tables. - vschema, err := wr.ts.GetVSchema(ctx, keyspace) +// validateNewWorkflow ensures that the specified workflow doesn't already exist +// in the keyspace. +func (wr *Wrangler) validateNewWorkflow(ctx context.Context, keyspace, workflow string) error { + allshards, err := wr.ts.FindAllShardsInKeyspace(ctx, keyspace) if err != nil { return err } - var excludeRules []*binlogdatapb.Rule - for tableName, ti := range vschema.Tables { - if ti.Type == vindexes.TypeReference { - excludeRules = append(excludeRules, &binlogdatapb.Rule{ - Match: tableName, - Filter: "exclude", - }) - } - } + var wg sync.WaitGroup + allErrors := &concurrency.AllErrorRecorder{} + for _, si := range allshards { + wg.Add(1) + go func(si *topo.ShardInfo) { + defer wg.Done() - for _, dest := range targetShards { - master, err := wr.ts.GetTablet(ctx, dest.MasterAlias) - if err != nil { - return vterrors.Wrapf(err, "GetTablet(%v) failed", dest.MasterAlias) - } - for _, source := range sourceShards { - if !key.KeyRangesIntersect(dest.KeyRange, source.KeyRange) { - continue - } - filter := &binlogdatapb.Filter{ - Rules: append(excludeRules, &binlogdatapb.Rule{ - Match: "/.*", - Filter: key.KeyRangeString(dest.KeyRange), - }), + master, err := wr.ts.GetTablet(ctx, si.MasterAlias) + if err != nil { + allErrors.RecordError(vterrors.Wrap(err, "validateWorkflowName.GetTablet")) + return } - bls := &binlogdatapb.BinlogSource{ - Keyspace: keyspace, - Shard: source.ShardName(), - Filter: filter, + + query := fmt.Sprintf("select 1 from _vt.vreplication where db_name=%s and workflow=%s", encodeString(master.DbName()), encodeString(workflow)) + p3qr, err := wr.tmc.VReplicationExec(ctx, master.Tablet, query) + if err != nil { + allErrors.RecordError(vterrors.Wrap(err, "validateWorkflowName.VReplicationExec")) + return } - // TODO(sougou): do this in two phases. - cmd := binlogplayer.CreateVReplicationState(workflow, bls, "", binlogplayer.BlpRunning, master.DbName()) - if _, err := wr.TabletManagerClient().VReplicationExec(ctx, master.Tablet, cmd); err != nil { - return vterrors.Wrapf(err, "VReplicationExec(%v, %s) failed", dest.MasterAlias, cmd) + if len(p3qr.Rows) != 0 { + allErrors.RecordError(fmt.Errorf("workflow %s already exists in keyspace %s", workflow, keyspace)) + return } - } + }(si) } - return wr.refreshMasters(ctx, targetShards) + wg.Wait() + return allErrors.AggrError(vterrors.Aggregate) } // SplitClone initiates a SplitClone workflow. diff --git a/go/vt/wrangler/resharder.go b/go/vt/wrangler/resharder.go new file mode 100644 index 00000000000..221747d6347 --- /dev/null +++ b/go/vt/wrangler/resharder.go @@ -0,0 +1,335 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package wrangler + +import ( + "fmt" + "sort" + "strings" + "sync" + "time" + + "github.com/golang/protobuf/proto" + "github.com/pkg/errors" + "golang.org/x/net/context" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/binlog/binlogplayer" + "vitess.io/vitess/go/vt/concurrency" + "vitess.io/vitess/go/vt/key" + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + vschemapb "vitess.io/vitess/go/vt/proto/vschema" + "vitess.io/vitess/go/vt/throttler" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topotools" + "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vtgate/vindexes" + "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" +) + +type resharder struct { + wr *Wrangler + keyspace string + workflow string + sourceShards []*topo.ShardInfo + targetShards []*topo.ShardInfo + vschema *vschemapb.Keyspace + refStreams map[string]*refStream +} + +type refStream struct { + workflow string + bls *binlogdatapb.BinlogSource + cell string + tabletTypes string +} + +// Reshard initiates a resharding workflow. +func (wr *Wrangler) Reshard(ctx context.Context, keyspace, workflow string, sources, targets []string) error { + if err := wr.validateNewWorkflow(ctx, keyspace, workflow); err != nil { + return err + } + + rs, err := wr.buildResharder(ctx, keyspace, workflow, sources, targets) + if err != nil { + return vterrors.Wrap(err, "buildResharder") + } + if err := wr.refreshMasters(ctx, rs.targetShards); err != nil { + return errors.Wrap(err, "refreshMasters") + } + return nil +} + +func (wr *Wrangler) buildResharder(ctx context.Context, keyspace, workflow string, sources, targets []string) (*resharder, error) { + rs := &resharder{ + wr: wr, + keyspace: keyspace, + workflow: workflow, + } + for _, shard := range sources { + si, err := wr.ts.GetShard(ctx, keyspace, shard) + if err != nil { + return nil, vterrors.Wrapf(err, "GetShard(%s) failed", shard) + } + rs.sourceShards = append(rs.sourceShards, si) + } + for _, shard := range targets { + si, err := wr.ts.GetShard(ctx, keyspace, shard) + if err != nil { + return nil, vterrors.Wrapf(err, "GetShard(%s) failed", shard) + } + rs.targetShards = append(rs.targetShards, si) + } + if err := topotools.ValidateForReshard(rs.sourceShards, rs.targetShards); err != nil { + return nil, vterrors.Wrap(err, "ValidateForReshard") + } + + vschema, err := wr.ts.GetVSchema(ctx, keyspace) + if err != nil { + return nil, vterrors.Wrap(err, "GetVSchema") + } + rs.vschema = vschema + + if err := rs.readRefStreams(ctx); err != nil { + return nil, vterrors.Wrap(err, "readRefStreams") + } + return rs, nil +} + +func (rs *resharder) readRefStreams(ctx context.Context) error { + var mu sync.Mutex + err := rs.forAll(rs.sourceShards, func(source *topo.ShardInfo) error { + sourceMaster, err := rs.wr.ts.GetTablet(ctx, source.MasterAlias) + if err != nil { + return vterrors.Wrapf(err, "GetTablet(%v)", source.MasterAlias) + } + + query := fmt.Sprintf("select workflow, source, cell, tablet_types from _vt.vreplication where db_name=%s", encodeString(sourceMaster.DbName())) + p3qr, err := rs.wr.tmc.VReplicationExec(ctx, sourceMaster.Tablet, query) + if err != nil { + return vterrors.Wrapf(err, "VReplicationExec(%v, %s)", source.MasterAlias, query) + } + qr := sqltypes.Proto3ToResult(p3qr) + + mu.Lock() + defer mu.Unlock() + + mustCreate := false + var ref map[string]bool + if rs.refStreams == nil { + rs.refStreams = make(map[string]*refStream) + mustCreate = true + } else { + // Copy the ref streams for comparison. + ref = make(map[string]bool, len(rs.refStreams)) + for k := range rs.refStreams { + ref[k] = true + } + } + for _, row := range qr.Rows { + workflow := row[0].ToString() + if workflow == "" { + return fmt.Errorf("VReplication streams must have named workflows for migration: shard: %s:%s", source.Keyspace(), source.ShardName()) + } + var bls binlogdatapb.BinlogSource + if err := proto.UnmarshalText(row[1].ToString(), &bls); err != nil { + return vterrors.Wrapf(err, "UnmarshalText: %v", row) + } + isReference, err := rs.blsIsReference(&bls) + if err != nil { + return vterrors.Wrap(err, "blsIsReference") + } + if !isReference { + continue + } + key := fmt.Sprintf("%s:%s:%s", workflow, bls.Keyspace, bls.Shard) + if mustCreate { + rs.refStreams[key] = &refStream{ + workflow: workflow, + bls: &bls, + cell: row[2].ToString(), + tabletTypes: row[3].ToString(), + } + } else { + if !ref[key] { + return fmt.Errorf("streams are mismatched across source shards for workflow: %s", workflow) + } + delete(ref, key) + } + } + if len(ref) != 0 { + return fmt.Errorf("streams are mismatched across source shards: %v", ref) + } + return nil + }) + return err +} + +// blsIsReference is partially copied from streamMigrater.templatize. +// It reuses the constants from that function also. +func (rs *resharder) blsIsReference(bls *binlogdatapb.BinlogSource) (bool, error) { + streamType := unknown + for _, rule := range bls.Filter.Rules { + typ, err := rs.identifyRuleType(rule) + if err != nil { + return false, err + } + switch typ { + case sharded: + if streamType == reference { + return false, fmt.Errorf("cannot reshard streams with a mix of reference and sharded tables: %v", bls) + } + streamType = sharded + case reference: + if streamType == sharded { + return false, fmt.Errorf("cannot reshard streams with a mix of reference and sharded tables: %v", bls) + } + streamType = reference + } + } + return streamType == reference, nil +} + +func (rs *resharder) identifyRuleType(rule *binlogdatapb.Rule) (int, error) { + vtable, ok := rs.vschema.Tables[rule.Match] + if !ok { + return 0, fmt.Errorf("table %v not found in vschema", rule.Match) + } + if vtable.Type == vindexes.TypeReference { + return reference, nil + } + switch { + case rule.Filter == "": + return unknown, fmt.Errorf("rule %v does not have a select expression in vreplication", rule) + case key.IsKeyRange(rule.Filter): + return sharded, nil + case rule.Filter == vreplication.ExcludeStr: + return unknown, fmt.Errorf("unexpected rule in vreplication: %v", rule) + default: + return sharded, nil + } +} + +func (rs *resharder) createStreams(ctx context.Context) error { + var excludeRules []*binlogdatapb.Rule + for tableName, table := range rs.vschema.Tables { + if table.Type == vindexes.TypeReference { + excludeRules = append(excludeRules, &binlogdatapb.Rule{ + Match: tableName, + Filter: "exclude", + }) + } + } + + err := rs.forAll(rs.targetShards, func(target *topo.ShardInfo) error { + master, err := rs.wr.ts.GetTablet(ctx, target.MasterAlias) + if err != nil { + return vterrors.Wrapf(err, "GetTablet(%v) failed", target.MasterAlias) + } + + buf := &strings.Builder{} + buf.WriteString("insert into _vt.vreplication(workflow, source, pos, max_tps, max_replication_lag, cell, tablet_types, time_updated, transaction_timestamp, state, db_name) values ") + prefix := "" + + addLine := func(workflow string, bls *binlogdatapb.BinlogSource, cell, tabletTypes string) { + fmt.Fprintf(buf, "%s(%v, %v, '', %v, %v, %v, %v, %v, 0, '%v', %v)", + prefix, + encodeString(workflow), + encodeString(bls.String()), + throttler.MaxRateModuleDisabled, + throttler.ReplicationLagModuleDisabled, + encodeString(cell), + encodeString(tabletTypes), + time.Now().Unix(), + binlogplayer.BlpStopped, + encodeString(master.DbName())) + prefix = ", " + } + + // copy excludeRules to prevent data race. + copyExcludeRules := append([]*binlogdatapb.Rule(nil), excludeRules...) + for _, source := range rs.sourceShards { + if !key.KeyRangesIntersect(target.KeyRange, source.KeyRange) { + continue + } + filter := &binlogdatapb.Filter{ + Rules: append(copyExcludeRules, &binlogdatapb.Rule{ + Match: "/.*", + Filter: key.KeyRangeString(target.KeyRange), + }), + } + bls := &binlogdatapb.BinlogSource{ + Keyspace: rs.keyspace, + Shard: source.ShardName(), + Filter: filter, + } + addLine(rs.workflow, bls, "", "") + } + + for _, rstream := range rs.refStreams { + addLine(rstream.workflow, rstream.bls, rstream.cell, rstream.tabletTypes) + } + query := buf.String() + if _, err := rs.wr.tmc.VReplicationExec(ctx, master.Tablet, query); err != nil { + return vterrors.Wrapf(err, "VReplicationExec(%v, %s)", target.MasterAlias, query) + } + return nil + }) + + return err +} + +func (rs *resharder) startStreaming(ctx context.Context) error { + workflows := make(map[string]bool) + workflows[rs.workflow] = true + for _, rstream := range rs.refStreams { + workflows[rstream.workflow] = true + } + list := make([]string, 0, len(workflows)) + for k := range workflows { + list = append(list, k) + } + sort.Strings(list) + err := rs.forAll(rs.targetShards, func(target *topo.ShardInfo) error { + master, err := rs.wr.ts.GetTablet(ctx, target.MasterAlias) + if err != nil { + return vterrors.Wrapf(err, "GetTablet(%v) failed", target.MasterAlias) + } + query := fmt.Sprintf("update _vt.vreplication set state='Running' where db_name=%s and workflow in (%s)", encodeString(master.DbName()), stringListify(list)) + if _, err := rs.wr.tmc.VReplicationExec(ctx, master.Tablet, query); err != nil { + return vterrors.Wrapf(err, "VReplicationExec(%v, %s)", target.MasterAlias, query) + } + return nil + }) + return err +} + +func (rs *resharder) forAll(shards []*topo.ShardInfo, f func(*topo.ShardInfo) error) error { + var wg sync.WaitGroup + allErrors := &concurrency.AllErrorRecorder{} + for _, shard := range shards { + wg.Add(1) + go func(shard *topo.ShardInfo) { + defer wg.Done() + + if err := f(shard); err != nil { + allErrors.RecordError(err) + } + }(shard) + } + wg.Wait() + return allErrors.AggrError(vterrors.Aggregate) +} From f75ce0edd9fbf9c0ec5e35988ba197663f62e578 Mon Sep 17 00:00:00 2001 From: Sugu Sougoumarane Date: Sat, 26 Oct 2019 15:31:41 -0700 Subject: [PATCH 03/10] vreplication: reshard: more validations Signed-off-by: Sugu Sougoumarane --- go/vt/wrangler/resharder.go | 101 +++++++++++++++++++++--------------- 1 file changed, 60 insertions(+), 41 deletions(-) diff --git a/go/vt/wrangler/resharder.go b/go/vt/wrangler/resharder.go index 221747d6347..36b6405feec 100644 --- a/go/vt/wrangler/resharder.go +++ b/go/vt/wrangler/resharder.go @@ -18,7 +18,6 @@ package wrangler import ( "fmt" - "sort" "strings" "sync" "time" @@ -41,13 +40,15 @@ import ( ) type resharder struct { - wr *Wrangler - keyspace string - workflow string - sourceShards []*topo.ShardInfo - targetShards []*topo.ShardInfo - vschema *vschemapb.Keyspace - refStreams map[string]*refStream + wr *Wrangler + keyspace string + workflow string + sourceShards []*topo.ShardInfo + sourceMasters map[string]*topo.TabletInfo + targetShards []*topo.ShardInfo + targetMasters map[string]*topo.TabletInfo + vschema *vschemapb.Keyspace + refStreams map[string]*refStream } type refStream struct { @@ -67,6 +68,12 @@ func (wr *Wrangler) Reshard(ctx context.Context, keyspace, workflow string, sour if err != nil { return vterrors.Wrap(err, "buildResharder") } + if err := rs.createStreams(ctx); err != nil { + return vterrors.Wrap(err, "createStreams") + } + if err := rs.startStreams(ctx); err != nil { + return vterrors.Wrap(err, "startStream") + } if err := wr.refreshMasters(ctx, rs.targetShards); err != nil { return errors.Wrap(err, "refreshMasters") } @@ -75,9 +82,11 @@ func (wr *Wrangler) Reshard(ctx context.Context, keyspace, workflow string, sour func (wr *Wrangler) buildResharder(ctx context.Context, keyspace, workflow string, sources, targets []string) (*resharder, error) { rs := &resharder{ - wr: wr, - keyspace: keyspace, - workflow: workflow, + wr: wr, + keyspace: keyspace, + workflow: workflow, + sourceMasters: make(map[string]*topo.TabletInfo), + targetMasters: make(map[string]*topo.TabletInfo), } for _, shard := range sources { si, err := wr.ts.GetShard(ctx, keyspace, shard) @@ -85,6 +94,11 @@ func (wr *Wrangler) buildResharder(ctx context.Context, keyspace, workflow strin return nil, vterrors.Wrapf(err, "GetShard(%s) failed", shard) } rs.sourceShards = append(rs.sourceShards, si) + master, err := wr.ts.GetTablet(ctx, si.MasterAlias) + if err != nil { + return nil, vterrors.Wrapf(err, "GetTablet(%s) failed", si.MasterAlias) + } + rs.sourceMasters[si.ShardName()] = master } for _, shard := range targets { si, err := wr.ts.GetShard(ctx, keyspace, shard) @@ -92,10 +106,18 @@ func (wr *Wrangler) buildResharder(ctx context.Context, keyspace, workflow strin return nil, vterrors.Wrapf(err, "GetShard(%s) failed", shard) } rs.targetShards = append(rs.targetShards, si) + master, err := wr.ts.GetTablet(ctx, si.MasterAlias) + if err != nil { + return nil, vterrors.Wrapf(err, "GetTablet(%s) failed", si.MasterAlias) + } + rs.targetMasters[si.ShardName()] = master } if err := topotools.ValidateForReshard(rs.sourceShards, rs.targetShards); err != nil { return nil, vterrors.Wrap(err, "ValidateForReshard") } + if err := rs.validateTargets(ctx); err != nil { + return nil, vterrors.Wrap(err, "validateTargets") + } vschema, err := wr.ts.GetVSchema(ctx, keyspace) if err != nil { @@ -109,18 +131,31 @@ func (wr *Wrangler) buildResharder(ctx context.Context, keyspace, workflow strin return rs, nil } +func (rs *resharder) validateTargets(ctx context.Context) error { + err := rs.forAll(rs.targetShards, func(target *topo.ShardInfo) error { + targetMaster := rs.targetMasters[target.ShardName()] + query := fmt.Sprintf("select 1 from _vt.vreplication where db_name=%s", encodeString(targetMaster.DbName())) + p3qr, err := rs.wr.tmc.VReplicationExec(ctx, targetMaster.Tablet, query) + if err != nil { + return vterrors.Wrapf(err, "VReplicationExec(%v, %s)", targetMaster.Tablet, query) + } + if len(p3qr.Rows) != 0 { + return errors.New("some streams already exist in the target shards, please clean them up and retry the command") + } + return nil + }) + return err +} + func (rs *resharder) readRefStreams(ctx context.Context) error { var mu sync.Mutex err := rs.forAll(rs.sourceShards, func(source *topo.ShardInfo) error { - sourceMaster, err := rs.wr.ts.GetTablet(ctx, source.MasterAlias) - if err != nil { - return vterrors.Wrapf(err, "GetTablet(%v)", source.MasterAlias) - } + sourceMaster := rs.sourceMasters[source.ShardName()] query := fmt.Sprintf("select workflow, source, cell, tablet_types from _vt.vreplication where db_name=%s", encodeString(sourceMaster.DbName())) p3qr, err := rs.wr.tmc.VReplicationExec(ctx, sourceMaster.Tablet, query) if err != nil { - return vterrors.Wrapf(err, "VReplicationExec(%v, %s)", source.MasterAlias, query) + return vterrors.Wrapf(err, "VReplicationExec(%v, %s)", sourceMaster.Tablet, query) } qr := sqltypes.Proto3ToResult(p3qr) @@ -235,10 +270,7 @@ func (rs *resharder) createStreams(ctx context.Context) error { } err := rs.forAll(rs.targetShards, func(target *topo.ShardInfo) error { - master, err := rs.wr.ts.GetTablet(ctx, target.MasterAlias) - if err != nil { - return vterrors.Wrapf(err, "GetTablet(%v) failed", target.MasterAlias) - } + targetMaster := rs.targetMasters[target.ShardName()] buf := &strings.Builder{} buf.WriteString("insert into _vt.vreplication(workflow, source, pos, max_tps, max_replication_lag, cell, tablet_types, time_updated, transaction_timestamp, state, db_name) values ") @@ -255,7 +287,7 @@ func (rs *resharder) createStreams(ctx context.Context) error { encodeString(tabletTypes), time.Now().Unix(), binlogplayer.BlpStopped, - encodeString(master.DbName())) + encodeString(targetMaster.DbName())) prefix = ", " } @@ -283,8 +315,8 @@ func (rs *resharder) createStreams(ctx context.Context) error { addLine(rstream.workflow, rstream.bls, rstream.cell, rstream.tabletTypes) } query := buf.String() - if _, err := rs.wr.tmc.VReplicationExec(ctx, master.Tablet, query); err != nil { - return vterrors.Wrapf(err, "VReplicationExec(%v, %s)", target.MasterAlias, query) + if _, err := rs.wr.tmc.VReplicationExec(ctx, targetMaster.Tablet, query); err != nil { + return vterrors.Wrapf(err, "VReplicationExec(%v, %s)", targetMaster.Tablet, query) } return nil }) @@ -292,25 +324,12 @@ func (rs *resharder) createStreams(ctx context.Context) error { return err } -func (rs *resharder) startStreaming(ctx context.Context) error { - workflows := make(map[string]bool) - workflows[rs.workflow] = true - for _, rstream := range rs.refStreams { - workflows[rstream.workflow] = true - } - list := make([]string, 0, len(workflows)) - for k := range workflows { - list = append(list, k) - } - sort.Strings(list) +func (rs *resharder) startStreams(ctx context.Context) error { err := rs.forAll(rs.targetShards, func(target *topo.ShardInfo) error { - master, err := rs.wr.ts.GetTablet(ctx, target.MasterAlias) - if err != nil { - return vterrors.Wrapf(err, "GetTablet(%v) failed", target.MasterAlias) - } - query := fmt.Sprintf("update _vt.vreplication set state='Running' where db_name=%s and workflow in (%s)", encodeString(master.DbName()), stringListify(list)) - if _, err := rs.wr.tmc.VReplicationExec(ctx, master.Tablet, query); err != nil { - return vterrors.Wrapf(err, "VReplicationExec(%v, %s)", target.MasterAlias, query) + targetMaster := rs.targetMasters[target.ShardName()] + query := fmt.Sprintf("update _vt.vreplication set state='Running' where db_name=%s", encodeString(targetMaster.DbName())) + if _, err := rs.wr.tmc.VReplicationExec(ctx, targetMaster.Tablet, query); err != nil { + return vterrors.Wrapf(err, "VReplicationExec(%v, %s)", targetMaster.Tablet, query) } return nil }) From ec61a56bf1c777e7bf10f06d7d2fb0eb962d8923 Mon Sep 17 00:00:00 2001 From: Sugu Sougoumarane Date: Sat, 26 Oct 2019 17:57:59 -0700 Subject: [PATCH 04/10] vreplication: reshard: copy schema option Signed-off-by: Sugu Sougoumarane --- go/vt/vtctl/vtctl.go | 5 +++-- go/vt/wrangler/resharder.go | 24 ++++++++++++++++++++---- 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/go/vt/vtctl/vtctl.go b/go/vt/vtctl/vtctl.go index 4b2052130d1..a7050ded940 100644 --- a/go/vt/vtctl/vtctl.go +++ b/go/vt/vtctl/vtctl.go @@ -308,7 +308,7 @@ var commands = []commandGroup{ "[-ping-tablets] ", "Validates that all nodes reachable from the specified keyspace are consistent."}, {"Reshard", commandReshard, - " ", + "[-skip_schema_copy] ", "Start a Resharding process. Example: Reshard ks.workflow001 '0' '-80,80-'"}, {"SplitClone", commandSplitClone, " ", @@ -1788,6 +1788,7 @@ func commandValidateKeyspace(ctx context.Context, wr *wrangler.Wrangler, subFlag } func commandReshard(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error { + skipSchemaCopy := subFlags.Bool("skip_schema_copy", false, "Skip copying of schema to targets") if err := subFlags.Parse(args); err != nil { return err } @@ -1800,7 +1801,7 @@ func commandReshard(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.F } source := strings.Split(subFlags.Arg(1), ",") target := strings.Split(subFlags.Arg(2), ",") - return wr.Reshard(ctx, keyspace, workflow, source, target) + return wr.Reshard(ctx, keyspace, workflow, source, target, *skipSchemaCopy) } func commandSplitClone(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error { diff --git a/go/vt/wrangler/resharder.go b/go/vt/wrangler/resharder.go index 36b6405feec..3453cda7904 100644 --- a/go/vt/wrangler/resharder.go +++ b/go/vt/wrangler/resharder.go @@ -59,7 +59,7 @@ type refStream struct { } // Reshard initiates a resharding workflow. -func (wr *Wrangler) Reshard(ctx context.Context, keyspace, workflow string, sources, targets []string) error { +func (wr *Wrangler) Reshard(ctx context.Context, keyspace, workflow string, sources, targets []string, skipSchemaCopy bool) error { if err := wr.validateNewWorkflow(ctx, keyspace, workflow); err != nil { return err } @@ -68,15 +68,17 @@ func (wr *Wrangler) Reshard(ctx context.Context, keyspace, workflow string, sour if err != nil { return vterrors.Wrap(err, "buildResharder") } + if !skipSchemaCopy { + if err := rs.copySchema(ctx); err != nil { + return vterrors.Wrap(err, "copySchema") + } + } if err := rs.createStreams(ctx); err != nil { return vterrors.Wrap(err, "createStreams") } if err := rs.startStreams(ctx); err != nil { return vterrors.Wrap(err, "startStream") } - if err := wr.refreshMasters(ctx, rs.targetShards); err != nil { - return errors.Wrap(err, "refreshMasters") - } return nil } @@ -93,6 +95,9 @@ func (wr *Wrangler) buildResharder(ctx context.Context, keyspace, workflow strin if err != nil { return nil, vterrors.Wrapf(err, "GetShard(%s) failed", shard) } + if !si.IsMasterServing { + return nil, fmt.Errorf("source shard %v is not in serving state", shard) + } rs.sourceShards = append(rs.sourceShards, si) master, err := wr.ts.GetTablet(ctx, si.MasterAlias) if err != nil { @@ -105,6 +110,9 @@ func (wr *Wrangler) buildResharder(ctx context.Context, keyspace, workflow strin if err != nil { return nil, vterrors.Wrapf(err, "GetShard(%s) failed", shard) } + if si.IsMasterServing { + return nil, fmt.Errorf("target shard %v is in serving state", shard) + } rs.targetShards = append(rs.targetShards, si) master, err := wr.ts.GetTablet(ctx, si.MasterAlias) if err != nil { @@ -258,6 +266,14 @@ func (rs *resharder) identifyRuleType(rule *binlogdatapb.Rule) (int, error) { } } +func (rs *resharder) copySchema(ctx context.Context) error { + oneSource := rs.sourceShards[0].MasterAlias + err := rs.forAll(rs.targetShards, func(target *topo.ShardInfo) error { + return rs.wr.CopySchemaShard(ctx, oneSource, []string{"/.*"}, nil, false, rs.keyspace, target.ShardName(), 1*time.Second) + }) + return err +} + func (rs *resharder) createStreams(ctx context.Context) error { var excludeRules []*binlogdatapb.Rule for tableName, table := range rs.vschema.Tables { From 6cbf28f288ec81e4ea7e5f45b657cdb5af04bc6e Mon Sep 17 00:00:00 2001 From: Sugu Sougoumarane Date: Sat, 26 Oct 2019 21:52:01 -0700 Subject: [PATCH 05/10] vreplication: reshard basic test & framework Signed-off-by: Sugu Sougoumarane --- go/vt/wrangler/resharder_env_test.go | 172 +++++++++++++++++++++++++++ go/vt/wrangler/resharder_test.go | 62 ++++++++++ 2 files changed, 234 insertions(+) create mode 100644 go/vt/wrangler/resharder_env_test.go create mode 100644 go/vt/wrangler/resharder_test.go diff --git a/go/vt/wrangler/resharder_env_test.go b/go/vt/wrangler/resharder_env_test.go new file mode 100644 index 00000000000..a13cb069e58 --- /dev/null +++ b/go/vt/wrangler/resharder_env_test.go @@ -0,0 +1,172 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package wrangler + +import ( + "fmt" + "regexp" + + "golang.org/x/net/context" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/logutil" + querypb "vitess.io/vitess/go/vt/proto/query" + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/memorytopo" + "vitess.io/vitess/go/vt/vttablet/tmclient" +) + +type testResharderEnv struct { + wr *Wrangler + keyspace string + workflow string + sources []string + targets []string + tablets map[int]*topodatapb.Tablet + topoServ *topo.Server + cell string + tmc *testResharderTMClient +} + +//---------------------------------------------- +// testResharderEnv + +func newTestResharderEnv(sources, targets []string) *testResharderEnv { + env := &testResharderEnv{ + keyspace: "ks", + workflow: "resharderTest", + sources: sources, + targets: targets, + tablets: make(map[int]*topodatapb.Tablet), + topoServ: memorytopo.NewServer("cell"), + cell: "cell", + tmc: newTestResharderTMClient(), + } + env.wr = New(logutil.NewConsoleLogger(), env.topoServ, env.tmc) + + tabletID := 100 + for _, shard := range sources { + master := env.addTablet(tabletID, env.keyspace, shard, topodatapb.TabletType_MASTER) + + // wr.validateNewWorkflow + env.tmc.setVRResults(master, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s' and workflow='%s'", env.keyspace, env.workflow), &sqltypes.Result{}) + // readRefStreams + env.tmc.setVRResults(master, fmt.Sprintf("select workflow, source, cell, tablet_types from _vt.vreplication where db_name='vt_%s'", env.keyspace), &sqltypes.Result{}) + + tabletID += 10 + } + tabletID = 200 + for _, shard := range targets { + master := env.addTablet(tabletID, env.keyspace, shard, topodatapb.TabletType_MASTER) + + // wr.validateNewWorkflow + env.tmc.setVRResults(master, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s' and workflow='%s'", env.keyspace, env.workflow), &sqltypes.Result{}) + // validateTargets + env.tmc.setVRResults(master, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s'", env.keyspace), &sqltypes.Result{}) + + tabletID += 10 + } + return env +} + +func (env *testResharderEnv) close() { + for _, t := range env.tablets { + env.deleteTablet(t) + } +} + +func (env *testResharderEnv) addTablet(id int, keyspace, shard string, tabletType topodatapb.TabletType) *topodatapb.Tablet { + tablet := &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: env.cell, + Uid: uint32(id), + }, + Keyspace: keyspace, + Shard: shard, + KeyRange: &topodatapb.KeyRange{}, + Type: tabletType, + PortMap: map[string]int32{ + "test": int32(id), + }, + } + env.tablets[id] = tablet + if err := env.wr.InitTablet(context.Background(), tablet, false /* allowMasterOverride */, true /* createShardAndKeyspace */, false /* allowUpdate */); err != nil { + panic(err) + } + return tablet +} + +func (env *testResharderEnv) deleteTablet(tablet *topodatapb.Tablet) { + env.topoServ.DeleteTablet(context.Background(), tablet.Alias) + delete(env.tablets, int(tablet.Alias.Uid)) +} + +//---------------------------------------------- +// testResharderTMClient + +type testResharderTMClient struct { + tmclient.TabletManagerClient + schema *tabletmanagerdatapb.SchemaDefinition + vrQueries map[int]map[string]*querypb.QueryResult + vrQueriesRE map[int]map[string]*querypb.QueryResult +} + +func newTestResharderTMClient() *testResharderTMClient { + return &testResharderTMClient{ + vrQueries: make(map[int]map[string]*querypb.QueryResult), + vrQueriesRE: make(map[int]map[string]*querypb.QueryResult), + } +} + +func (tmc *testResharderTMClient) GetSchema(ctx context.Context, tablet *topodatapb.Tablet, tables, excludeTables []string, includeViews bool) (*tabletmanagerdatapb.SchemaDefinition, error) { + return tmc.schema, nil +} + +func (tmc *testResharderTMClient) setVRResults(tablet *topodatapb.Tablet, query string, result *sqltypes.Result) { + queries, ok := tmc.vrQueries[int(tablet.Alias.Uid)] + if !ok { + queries = make(map[string]*querypb.QueryResult) + tmc.vrQueries[int(tablet.Alias.Uid)] = queries + } + queries[query] = sqltypes.ResultToProto3(result) +} + +func (tmc *testResharderTMClient) setVRResultsRE(tablet *topodatapb.Tablet, query string, result *sqltypes.Result) { + queriesRE, ok := tmc.vrQueriesRE[int(tablet.Alias.Uid)] + if !ok { + queriesRE = make(map[string]*querypb.QueryResult) + tmc.vrQueriesRE[int(tablet.Alias.Uid)] = queriesRE + } + queriesRE[query] = sqltypes.ResultToProto3(result) +} + +func (tmc *testResharderTMClient) VReplicationExec(ctx context.Context, tablet *topodatapb.Tablet, query string) (*querypb.QueryResult, error) { + result, ok := tmc.vrQueries[int(tablet.Alias.Uid)][query] + if ok { + return result, nil + } + queriesRE, ok := tmc.vrQueriesRE[int(tablet.Alias.Uid)] + if ok { + for re, result := range queriesRE { + if regexp.MustCompile(re).MatchString(query) { + return result, nil + } + } + } + return nil, fmt.Errorf("query %q not found for tablet %d", query, tablet.Alias.Uid) +} diff --git a/go/vt/wrangler/resharder_test.go b/go/vt/wrangler/resharder_test.go new file mode 100644 index 00000000000..9adef4b578a --- /dev/null +++ b/go/vt/wrangler/resharder_test.go @@ -0,0 +1,62 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package wrangler + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "golang.org/x/net/context" + "vitess.io/vitess/go/sqltypes" + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" +) + +func TestResharderSimple(t *testing.T) { + sources := []string{"0"} + targets := []string{"-80", "80-"} + env := newTestResharderEnv(sources, targets) + defer env.close() + + schm := &tabletmanagerdatapb.SchemaDefinition{ + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{ + Name: "t1", + Columns: []string{"c1", "c2"}, + PrimaryKeyColumns: []string{"c1"}, + Fields: sqltypes.MakeTestFields("c1|c2", "int64|int64"), + }}, + } + env.tmc.schema = schm + + env.tmc.setVRResultsRE( + env.tablets[200], + `insert into _vt.vreplication\(workflow, source, pos, max_tps, max_replication_lag, cell, tablet_types, time_updated, transaction_timestamp, state, db_name\) values `+ + `\('resharderTest', 'keyspace:\\"ks\\" shard:\\"0\\" filter: > ', '', 9223372036854775807, 9223372036854775807, '', '', [0-9]*, 0, 'Stopped', 'vt_ks'\)`, + &sqltypes.Result{}, + ) + env.tmc.setVRResultsRE( + env.tablets[210], + `insert into _vt.vreplication\(workflow, source, pos, max_tps, max_replication_lag, cell, tablet_types, time_updated, transaction_timestamp, state, db_name\) values `+ + `\('resharderTest', 'keyspace:\\"ks\\" shard:\\"0\\" filter: > ', '', 9223372036854775807, 9223372036854775807, '', '', [0-9]*, 0, 'Stopped', 'vt_ks'\)`, + &sqltypes.Result{}, + ) + + env.tmc.setVRResults(env.tablets[200], "update _vt.vreplication set state='Running' where db_name='vt_ks'", &sqltypes.Result{}) + env.tmc.setVRResults(env.tablets[210], "update _vt.vreplication set state='Running' where db_name='vt_ks'", &sqltypes.Result{}) + + err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true) + assert.NoError(t, err) +} From 3508f36a9fd7f4a157c143aaabf9c8c01f3a571b Mon Sep 17 00:00:00 2001 From: Sugu Sougoumarane Date: Sun, 27 Oct 2019 10:27:43 -0700 Subject: [PATCH 06/10] vreplication: reshard: stricter tests Signed-off-by: Sugu Sougoumarane --- go/vt/wrangler/resharder_env_test.go | 89 +++++++++++++++++----------- go/vt/wrangler/resharder_test.go | 29 ++++----- 2 files changed, 68 insertions(+), 50 deletions(-) diff --git a/go/vt/wrangler/resharder_env_test.go b/go/vt/wrangler/resharder_env_test.go index a13cb069e58..22a0971861c 100644 --- a/go/vt/wrangler/resharder_env_test.go +++ b/go/vt/wrangler/resharder_env_test.go @@ -19,6 +19,8 @@ package wrangler import ( "fmt" "regexp" + "sync" + "testing" "golang.org/x/net/context" "vitess.io/vitess/go/sqltypes" @@ -61,23 +63,23 @@ func newTestResharderEnv(sources, targets []string) *testResharderEnv { tabletID := 100 for _, shard := range sources { - master := env.addTablet(tabletID, env.keyspace, shard, topodatapb.TabletType_MASTER) + _ = env.addTablet(tabletID, env.keyspace, shard, topodatapb.TabletType_MASTER) // wr.validateNewWorkflow - env.tmc.setVRResults(master, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s' and workflow='%s'", env.keyspace, env.workflow), &sqltypes.Result{}) + env.tmc.expectVRQuery(tabletID, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s' and workflow='%s'", env.keyspace, env.workflow), &sqltypes.Result{}) // readRefStreams - env.tmc.setVRResults(master, fmt.Sprintf("select workflow, source, cell, tablet_types from _vt.vreplication where db_name='vt_%s'", env.keyspace), &sqltypes.Result{}) + env.tmc.expectVRQuery(tabletID, fmt.Sprintf("select workflow, source, cell, tablet_types from _vt.vreplication where db_name='vt_%s'", env.keyspace), &sqltypes.Result{}) tabletID += 10 } tabletID = 200 for _, shard := range targets { - master := env.addTablet(tabletID, env.keyspace, shard, topodatapb.TabletType_MASTER) + _ = env.addTablet(tabletID, env.keyspace, shard, topodatapb.TabletType_MASTER) // wr.validateNewWorkflow - env.tmc.setVRResults(master, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s' and workflow='%s'", env.keyspace, env.workflow), &sqltypes.Result{}) + env.tmc.expectVRQuery(tabletID, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s' and workflow='%s'", env.keyspace, env.workflow), &sqltypes.Result{}) // validateTargets - env.tmc.setVRResults(master, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s'", env.keyspace), &sqltypes.Result{}) + env.tmc.expectVRQuery(tabletID, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s'", env.keyspace), &sqltypes.Result{}) tabletID += 10 } @@ -121,15 +123,20 @@ func (env *testResharderEnv) deleteTablet(tablet *topodatapb.Tablet) { type testResharderTMClient struct { tmclient.TabletManagerClient - schema *tabletmanagerdatapb.SchemaDefinition - vrQueries map[int]map[string]*querypb.QueryResult - vrQueriesRE map[int]map[string]*querypb.QueryResult + schema *tabletmanagerdatapb.SchemaDefinition + + mu sync.Mutex + vrQueries map[int][]*queryResult +} + +type queryResult struct { + query string + result *querypb.QueryResult } func newTestResharderTMClient() *testResharderTMClient { return &testResharderTMClient{ - vrQueries: make(map[int]map[string]*querypb.QueryResult), - vrQueriesRE: make(map[int]map[string]*querypb.QueryResult), + vrQueries: make(map[int][]*queryResult), } } @@ -137,36 +144,46 @@ func (tmc *testResharderTMClient) GetSchema(ctx context.Context, tablet *topodat return tmc.schema, nil } -func (tmc *testResharderTMClient) setVRResults(tablet *topodatapb.Tablet, query string, result *sqltypes.Result) { - queries, ok := tmc.vrQueries[int(tablet.Alias.Uid)] - if !ok { - queries = make(map[string]*querypb.QueryResult) - tmc.vrQueries[int(tablet.Alias.Uid)] = queries - } - queries[query] = sqltypes.ResultToProto3(result) -} +func (tmc *testResharderTMClient) expectVRQuery(tabletID int, query string, result *sqltypes.Result) { + tmc.mu.Lock() + defer tmc.mu.Unlock() -func (tmc *testResharderTMClient) setVRResultsRE(tablet *topodatapb.Tablet, query string, result *sqltypes.Result) { - queriesRE, ok := tmc.vrQueriesRE[int(tablet.Alias.Uid)] - if !ok { - queriesRE = make(map[string]*querypb.QueryResult) - tmc.vrQueriesRE[int(tablet.Alias.Uid)] = queriesRE - } - queriesRE[query] = sqltypes.ResultToProto3(result) + tmc.vrQueries[tabletID] = append(tmc.vrQueries[tabletID], &queryResult{ + query: query, + result: sqltypes.ResultToProto3(result), + }) } func (tmc *testResharderTMClient) VReplicationExec(ctx context.Context, tablet *topodatapb.Tablet, query string) (*querypb.QueryResult, error) { - result, ok := tmc.vrQueries[int(tablet.Alias.Uid)][query] - if ok { - return result, nil + tmc.mu.Lock() + defer tmc.mu.Unlock() + + qrs := tmc.vrQueries[int(tablet.Alias.Uid)] + if len(qrs) == 0 { + return nil, fmt.Errorf("tablet %v does not expect any more queries: %s", tablet, query) } - queriesRE, ok := tmc.vrQueriesRE[int(tablet.Alias.Uid)] - if ok { - for re, result := range queriesRE { - if regexp.MustCompile(re).MatchString(query) { - return result, nil - } + matched := false + if qrs[0].query[0] == '/' { + matched = regexp.MustCompile(qrs[0].query[1:]).MatchString(query) + } else { + matched = query == qrs[0].query + } + if !matched { + return nil, fmt.Errorf("tablet %v: unexpected query %s, want: %s", tablet, query, qrs[0].query) + } + tmc.vrQueries[int(tablet.Alias.Uid)] = qrs[1:] + return qrs[0].result, nil +} + +func (tmc *testResharderTMClient) verifyQueries(t *testing.T) { + t.Helper() + + tmc.mu.Lock() + defer tmc.mu.Unlock() + + for tabletID, qrs := range tmc.vrQueries { + if len(qrs) != 0 { + t.Errorf("tablet %v: has unreturned results: %v", tabletID, qrs) } } - return nil, fmt.Errorf("query %q not found for tablet %d", query, tablet.Alias.Uid) } diff --git a/go/vt/wrangler/resharder_test.go b/go/vt/wrangler/resharder_test.go index 9adef4b578a..f3436ca8b54 100644 --- a/go/vt/wrangler/resharder_test.go +++ b/go/vt/wrangler/resharder_test.go @@ -25,10 +25,10 @@ import ( tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" ) -func TestResharderSimple(t *testing.T) { - sources := []string{"0"} - targets := []string{"-80", "80-"} - env := newTestResharderEnv(sources, targets) +const insertPrefix = `/insert into _vt.vreplication\(workflow, source, pos, max_tps, max_replication_lag, cell, tablet_types, time_updated, transaction_timestamp, state, db_name\) values ` + +func TestResharderOneToMany(t *testing.T) { + env := newTestResharderEnv([]string{"0"}, []string{"-80", "80-"}) defer env.close() schm := &tabletmanagerdatapb.SchemaDefinition{ @@ -41,22 +41,23 @@ func TestResharderSimple(t *testing.T) { } env.tmc.schema = schm - env.tmc.setVRResultsRE( - env.tablets[200], - `insert into _vt.vreplication\(workflow, source, pos, max_tps, max_replication_lag, cell, tablet_types, time_updated, transaction_timestamp, state, db_name\) values `+ - `\('resharderTest', 'keyspace:\\"ks\\" shard:\\"0\\" filter: > ', '', 9223372036854775807, 9223372036854775807, '', '', [0-9]*, 0, 'Stopped', 'vt_ks'\)`, + env.tmc.expectVRQuery( + 200, + insertPrefix+ + `\('resharderTest', 'keyspace:\\"ks\\" shard:\\"0\\" filter: > ', '', [0-9]*, [0-9]*, '', '', [0-9]*, 0, 'Stopped', 'vt_ks'\)`, &sqltypes.Result{}, ) - env.tmc.setVRResultsRE( - env.tablets[210], - `insert into _vt.vreplication\(workflow, source, pos, max_tps, max_replication_lag, cell, tablet_types, time_updated, transaction_timestamp, state, db_name\) values `+ - `\('resharderTest', 'keyspace:\\"ks\\" shard:\\"0\\" filter: > ', '', 9223372036854775807, 9223372036854775807, '', '', [0-9]*, 0, 'Stopped', 'vt_ks'\)`, + env.tmc.expectVRQuery( + 210, + insertPrefix+ + `\('resharderTest', 'keyspace:\\"ks\\" shard:\\"0\\" filter: > ', '', [0-9]*, [0-9]*, '', '', [0-9]*, 0, 'Stopped', 'vt_ks'\)`, &sqltypes.Result{}, ) - env.tmc.setVRResults(env.tablets[200], "update _vt.vreplication set state='Running' where db_name='vt_ks'", &sqltypes.Result{}) - env.tmc.setVRResults(env.tablets[210], "update _vt.vreplication set state='Running' where db_name='vt_ks'", &sqltypes.Result{}) + env.tmc.expectVRQuery(200, "update _vt.vreplication set state='Running' where db_name='vt_ks'", &sqltypes.Result{}) + env.tmc.expectVRQuery(210, "update _vt.vreplication set state='Running' where db_name='vt_ks'", &sqltypes.Result{}) err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true) assert.NoError(t, err) + env.tmc.verifyQueries(t) } From 92498db45d73fae347fafb104bb0920edd8d3632 Mon Sep 17 00:00:00 2001 From: Sugu Sougoumarane Date: Sun, 27 Oct 2019 10:43:48 -0700 Subject: [PATCH 07/10] topo: modify CreateShard IsMasterServing rule CreateShard starts off all unsharded shards as serving. It breaks if you create shards -80, 80-, and then 0 (with the intention to reshard to 0). It was previously allowed because we allowed the old style of custom sharding where you could create shards 0, 1, 2, etc. But we don't support that any more. The new rule sets master as serving only if the keyranges don't overlap. This is applied for unsharded keyspaces also. Signed-off-by: Sugu Sougoumarane --- go/vt/topo/shard.go | 31 ++++++++++++------------------- go/vt/topotools/shard_test.go | 10 ++++++---- 2 files changed, 18 insertions(+), 23 deletions(-) diff --git a/go/vt/topo/shard.go b/go/vt/topo/shard.go index 49d75847cf4..30fd5db1cda 100644 --- a/go/vt/topo/shard.go +++ b/go/vt/topo/shard.go @@ -279,7 +279,7 @@ func (ts *Server) CreateShard(ctx context.Context, keyspace, shard string) (err defer unlock(&err) // validate parameters - name, keyRange, err := ValidateShardName(shard) + _, keyRange, err := ValidateShardName(shard) if err != nil { return err } @@ -288,27 +288,20 @@ func (ts *Server) CreateShard(ctx context.Context, keyspace, shard string) (err KeyRange: keyRange, } - isMasterServing := true - - // start the shard IsMasterServing. If it overlaps with - // other shards for some serving types, remove them. - - if IsShardUsingRangeBasedSharding(name) { - // if we are using range-based sharding, we don't want - // overlapping shards to all serve and confuse the clients. - sis, err := ts.FindAllShardsInKeyspace(ctx, keyspace) - if err != nil && !IsErrType(err, NoNode) { - return err - } - for _, si := range sis { - if si.KeyRange == nil || key.KeyRangesIntersect(si.KeyRange, keyRange) { - isMasterServing = false - } + // Set master as serving only if its keyrange doesn't overlap + // with other shards. This applies to unsharded keyspaces also + value.IsMasterServing = true + sis, err := ts.FindAllShardsInKeyspace(ctx, keyspace) + if err != nil && !IsErrType(err, NoNode) { + return err + } + for _, si := range sis { + if si.KeyRange == nil || key.KeyRangesIntersect(si.KeyRange, keyRange) { + value.IsMasterServing = false + break } } - value.IsMasterServing = isMasterServing - // Marshal and save. data, err := proto.Marshal(value) if err != nil { diff --git a/go/vt/topotools/shard_test.go b/go/vt/topotools/shard_test.go index 521d163d378..68ab7be9a99 100644 --- a/go/vt/topotools/shard_test.go +++ b/go/vt/topotools/shard_test.go @@ -55,9 +55,11 @@ func TestCreateShard(t *testing.T) { } } -// TestCreateShardCustomSharding checks ServedTypes is set correctly -// when creating multiple custom sharding shards -func TestCreateShardCustomSharding(t *testing.T) { +// TestCreateShardMultiUnsharded checks ServedTypes is set +// only for the first created shard. +// TODO(sougou): we should eventually disallow multiple shards +// for unsharded keyspaces. +func TestCreateShardMultiUnsharded(t *testing.T) { ctx := context.Background() // Set up topology. @@ -90,7 +92,7 @@ func TestCreateShardCustomSharding(t *testing.T) { if si, err := ts.GetShard(ctx, keyspace, shard1); err != nil { t.Fatalf("GetShard(shard1) failed: %v", err) } else { - if !si.IsMasterServing { + if si.IsMasterServing { t.Fatalf("shard1 should have all 3 served types") } } From 190e642dd53aced43951de15f4f8ff9c3ae70025 Mon Sep 17 00:00:00 2001 From: Sugu Sougoumarane Date: Sun, 27 Oct 2019 14:31:23 -0700 Subject: [PATCH 08/10] vreplication: resharder all tests Signed-off-by: Sugu Sougoumarane --- go/vt/wrangler/resharder.go | 14 +- go/vt/wrangler/resharder_env_test.go | 42 +- go/vt/wrangler/resharder_test.go | 709 ++++++++++++++++++++++++++- 3 files changed, 741 insertions(+), 24 deletions(-) diff --git a/go/vt/wrangler/resharder.go b/go/vt/wrangler/resharder.go index 3453cda7904..d03a1d87eef 100644 --- a/go/vt/wrangler/resharder.go +++ b/go/vt/wrangler/resharder.go @@ -36,7 +36,6 @@ import ( "vitess.io/vitess/go/vt/topotools" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vtgate/vindexes" - "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" ) type resharder struct { @@ -254,16 +253,9 @@ func (rs *resharder) identifyRuleType(rule *binlogdatapb.Rule) (int, error) { if vtable.Type == vindexes.TypeReference { return reference, nil } - switch { - case rule.Filter == "": - return unknown, fmt.Errorf("rule %v does not have a select expression in vreplication", rule) - case key.IsKeyRange(rule.Filter): - return sharded, nil - case rule.Filter == vreplication.ExcludeStr: - return unknown, fmt.Errorf("unexpected rule in vreplication: %v", rule) - default: - return sharded, nil - } + // In this case, 'sharded' means that it's not a reference + // table. We don't care about any other subtleties. + return sharded, nil } func (rs *resharder) copySchema(ctx context.Context) error { diff --git a/go/vt/wrangler/resharder_env_test.go b/go/vt/wrangler/resharder_env_test.go index 22a0971861c..569783ba6f0 100644 --- a/go/vt/wrangler/resharder_env_test.go +++ b/go/vt/wrangler/resharder_env_test.go @@ -64,26 +64,37 @@ func newTestResharderEnv(sources, targets []string) *testResharderEnv { tabletID := 100 for _, shard := range sources { _ = env.addTablet(tabletID, env.keyspace, shard, topodatapb.TabletType_MASTER) - - // wr.validateNewWorkflow - env.tmc.expectVRQuery(tabletID, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s' and workflow='%s'", env.keyspace, env.workflow), &sqltypes.Result{}) - // readRefStreams - env.tmc.expectVRQuery(tabletID, fmt.Sprintf("select workflow, source, cell, tablet_types from _vt.vreplication where db_name='vt_%s'", env.keyspace), &sqltypes.Result{}) - tabletID += 10 } tabletID = 200 for _, shard := range targets { _ = env.addTablet(tabletID, env.keyspace, shard, topodatapb.TabletType_MASTER) + tabletID += 10 + } + return env +} +func (env *testResharderEnv) expectValidation() { + for _, tablet := range env.tablets { + tabletID := int(tablet.Alias.Uid) // wr.validateNewWorkflow env.tmc.expectVRQuery(tabletID, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s' and workflow='%s'", env.keyspace, env.workflow), &sqltypes.Result{}) - // validateTargets - env.tmc.expectVRQuery(tabletID, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s'", env.keyspace), &sqltypes.Result{}) - tabletID += 10 + if tabletID >= 200 { + // validateTargets + env.tmc.expectVRQuery(tabletID, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s'", env.keyspace), &sqltypes.Result{}) + } + } +} + +func (env *testResharderEnv) expectNoRefStream() { + for _, tablet := range env.tablets { + tabletID := int(tablet.Alias.Uid) + if tabletID < 200 { + // readRefStreams + env.tmc.expectVRQuery(tabletID, fmt.Sprintf("select workflow, source, cell, tablet_types from _vt.vreplication where db_name='vt_%s'", env.keyspace), &sqltypes.Result{}) + } } - return env } func (env *testResharderEnv) close() { @@ -175,6 +186,11 @@ func (tmc *testResharderTMClient) VReplicationExec(ctx context.Context, tablet * return qrs[0].result, nil } +func (tmc *testResharderTMClient) ExecuteFetchAsDba(ctx context.Context, tablet *topodatapb.Tablet, usePool bool, query []byte, maxRows int, disableBinlogs, reloadSchema bool) (*querypb.QueryResult, error) { + // Reuse VReplicationExec + return tmc.VReplicationExec(ctx, tablet, string(query)) +} + func (tmc *testResharderTMClient) verifyQueries(t *testing.T) { t.Helper() @@ -183,7 +199,11 @@ func (tmc *testResharderTMClient) verifyQueries(t *testing.T) { for tabletID, qrs := range tmc.vrQueries { if len(qrs) != 0 { - t.Errorf("tablet %v: has unreturned results: %v", tabletID, qrs) + var list []string + for _, qr := range qrs { + list = append(list, qr.query) + } + t.Errorf("tablet %v: has unreturned results: %v", tabletID, list) } } } diff --git a/go/vt/wrangler/resharder_test.go b/go/vt/wrangler/resharder_test.go index f3436ca8b54..d9214c8248d 100644 --- a/go/vt/wrangler/resharder_test.go +++ b/go/vt/wrangler/resharder_test.go @@ -17,15 +17,21 @@ limitations under the License. package wrangler import ( + "fmt" + "strings" "testing" "github.com/stretchr/testify/assert" "golang.org/x/net/context" "vitess.io/vitess/go/sqltypes" + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" + vschemapb "vitess.io/vitess/go/vt/proto/vschema" + "vitess.io/vitess/go/vt/vtgate/vindexes" ) const insertPrefix = `/insert into _vt.vreplication\(workflow, source, pos, max_tps, max_replication_lag, cell, tablet_types, time_updated, transaction_timestamp, state, db_name\) values ` +const eol = "$" func TestResharderOneToMany(t *testing.T) { env := newTestResharderEnv([]string{"0"}, []string{"-80", "80-"}) @@ -41,16 +47,293 @@ func TestResharderOneToMany(t *testing.T) { } env.tmc.schema = schm + env.expectValidation() + env.expectNoRefStream() + + env.tmc.expectVRQuery( + 200, + insertPrefix+ + `\('resharderTest', 'keyspace:\\"ks\\" shard:\\"0\\" filter: > ', '', [0-9]*, [0-9]*, '', '', [0-9]*, 0, 'Stopped', 'vt_ks'\)`+ + eol, + &sqltypes.Result{}, + ) + env.tmc.expectVRQuery( + 210, + insertPrefix+ + `\('resharderTest', 'keyspace:\\"ks\\" shard:\\"0\\" filter: > ', '', [0-9]*, [0-9]*, '', '', [0-9]*, 0, 'Stopped', 'vt_ks'\)`+ + eol, + &sqltypes.Result{}, + ) + + env.tmc.expectVRQuery(200, "update _vt.vreplication set state='Running' where db_name='vt_ks'", &sqltypes.Result{}) + env.tmc.expectVRQuery(210, "update _vt.vreplication set state='Running' where db_name='vt_ks'", &sqltypes.Result{}) + + err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true) + assert.NoError(t, err) + env.tmc.verifyQueries(t) +} + +func TestResharderManyToOne(t *testing.T) { + env := newTestResharderEnv([]string{"-80", "80-"}, []string{"0"}) + defer env.close() + + schm := &tabletmanagerdatapb.SchemaDefinition{ + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{ + Name: "t1", + Columns: []string{"c1", "c2"}, + PrimaryKeyColumns: []string{"c1"}, + Fields: sqltypes.MakeTestFields("c1|c2", "int64|int64"), + }}, + } + env.tmc.schema = schm + + env.expectValidation() + env.expectNoRefStream() + + env.tmc.expectVRQuery( + 200, + insertPrefix+ + `\('resharderTest', 'keyspace:\\"ks\\" shard:\\"-80\\" filter: > ', '', [0-9]*, [0-9]*, '', '', [0-9]*, 0, 'Stopped', 'vt_ks'\).*`+ + `\('resharderTest', 'keyspace:\\"ks\\" shard:\\"80-\\" filter: > ', '', [0-9]*, [0-9]*, '', '', [0-9]*, 0, 'Stopped', 'vt_ks'\)`+ + eol, + &sqltypes.Result{}, + ) + + env.tmc.expectVRQuery(200, "update _vt.vreplication set state='Running' where db_name='vt_ks'", &sqltypes.Result{}) + + err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true) + assert.NoError(t, err) + env.tmc.verifyQueries(t) +} + +func TestResharderManyToMany(t *testing.T) { + env := newTestResharderEnv([]string{"-40", "40-"}, []string{"-80", "80-"}) + defer env.close() + + schm := &tabletmanagerdatapb.SchemaDefinition{ + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{ + Name: "t1", + Columns: []string{"c1", "c2"}, + PrimaryKeyColumns: []string{"c1"}, + Fields: sqltypes.MakeTestFields("c1|c2", "int64|int64"), + }}, + } + env.tmc.schema = schm + + env.expectValidation() + env.expectNoRefStream() + + env.tmc.expectVRQuery( + 200, + insertPrefix+ + `\('resharderTest', 'keyspace:\\"ks\\" shard:\\"-40\\" filter: > ', '', [0-9]*, [0-9]*, '', '', [0-9]*, 0, 'Stopped', 'vt_ks'\).*`+ + `\('resharderTest', 'keyspace:\\"ks\\" shard:\\"40-\\" filter: > ', '', [0-9]*, [0-9]*, '', '', [0-9]*, 0, 'Stopped', 'vt_ks'\)`+ + eol, + &sqltypes.Result{}, + ) + env.tmc.expectVRQuery( + 210, + insertPrefix+ + `\('resharderTest', 'keyspace:\\"ks\\" shard:\\"40-\\" filter: > ', '', [0-9]*, [0-9]*, '', '', [0-9]*, 0, 'Stopped', 'vt_ks'\)`+ + eol, + &sqltypes.Result{}, + ) + + env.tmc.expectVRQuery(200, "update _vt.vreplication set state='Running' where db_name='vt_ks'", &sqltypes.Result{}) + env.tmc.expectVRQuery(210, "update _vt.vreplication set state='Running' where db_name='vt_ks'", &sqltypes.Result{}) + + err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true) + assert.NoError(t, err) + env.tmc.verifyQueries(t) +} + +// TestResharderOneRefTable tests the case where there's one ref table, but no stream for it. +// This means that the table is being updated manually. +func TestResharderOneRefTable(t *testing.T) { + env := newTestResharderEnv([]string{"0"}, []string{"-80", "80-"}) + defer env.close() + + schm := &tabletmanagerdatapb.SchemaDefinition{ + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{ + Name: "t1", + Columns: []string{"c1", "c2"}, + PrimaryKeyColumns: []string{"c1"}, + Fields: sqltypes.MakeTestFields("c1|c2", "int64|int64"), + }}, + } + env.tmc.schema = schm + + vs := &vschemapb.Keyspace{ + Tables: map[string]*vschemapb.Table{ + "t1": { + Type: vindexes.TypeReference, + }, + }, + } + if err := env.wr.ts.SaveVSchema(context.Background(), env.keyspace, vs); err != nil { + t.Fatal(err) + } + + env.expectValidation() + env.expectNoRefStream() + + env.tmc.expectVRQuery( + 200, + insertPrefix+ + `\('resharderTest', 'keyspace:\\"ks\\" shard:\\"0\\" filter: rules: > ', '', [0-9]*, [0-9]*, '', '', [0-9]*, 0, 'Stopped', 'vt_ks'\)`+ + eol, + &sqltypes.Result{}, + ) + env.tmc.expectVRQuery( + 210, + insertPrefix+ + `\('resharderTest', 'keyspace:\\"ks\\" shard:\\"0\\" filter: rules: > ', '', [0-9]*, [0-9]*, '', '', [0-9]*, 0, 'Stopped', 'vt_ks'\)`+ + eol, + &sqltypes.Result{}, + ) + + env.tmc.expectVRQuery(200, "update _vt.vreplication set state='Running' where db_name='vt_ks'", &sqltypes.Result{}) + env.tmc.expectVRQuery(210, "update _vt.vreplication set state='Running' where db_name='vt_ks'", &sqltypes.Result{}) + + err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true) + assert.NoError(t, err) + env.tmc.verifyQueries(t) +} + +// TestResharderOneRefStream tests the case where there's one ref table and an associated stream. +func TestResharderOneRefStream(t *testing.T) { + env := newTestResharderEnv([]string{"0"}, []string{"-80", "80-"}) + defer env.close() + + schm := &tabletmanagerdatapb.SchemaDefinition{ + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{ + Name: "t1", + Columns: []string{"c1", "c2"}, + PrimaryKeyColumns: []string{"c1"}, + Fields: sqltypes.MakeTestFields("c1|c2", "int64|int64"), + }}, + } + env.tmc.schema = schm + + vs := &vschemapb.Keyspace{ + Tables: map[string]*vschemapb.Table{ + "t1": { + Type: vindexes.TypeReference, + }, + }, + } + if err := env.wr.ts.SaveVSchema(context.Background(), env.keyspace, vs); err != nil { + t.Fatal(err) + } + + env.expectValidation() + + bls := &binlogdatapb.BinlogSource{ + Keyspace: "ks1", + Shard: "0", + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + }}, + }, + } + result := sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "workflow|source|cell|tablet_types", + "varchar|varchar|varchar|varchar"), + fmt.Sprintf("t1|%v|cell1|master,replica", bls), + ) + env.tmc.expectVRQuery(100, fmt.Sprintf("select workflow, source, cell, tablet_types from _vt.vreplication where db_name='vt_%s'", env.keyspace), result) + + refRow := `\('t1', 'keyspace:\\"ks1\\" shard:\\"0\\" filter: > ', '', [0-9]*, [0-9]*, 'cell1', 'master,replica', [0-9]*, 0, 'Stopped', 'vt_ks'\)` + env.tmc.expectVRQuery( + 200, + insertPrefix+ + `\('resharderTest', 'keyspace:\\"ks\\" shard:\\"0\\" filter: rules: > ', '', [0-9]*, [0-9]*, '', '', [0-9]*, 0, 'Stopped', 'vt_ks'\).*`+ + refRow+eol, + &sqltypes.Result{}, + ) + env.tmc.expectVRQuery( + 210, + insertPrefix+ + `\('resharderTest', 'keyspace:\\"ks\\" shard:\\"0\\" filter: rules: > ', '', [0-9]*, [0-9]*, '', '', [0-9]*, 0, 'Stopped', 'vt_ks'\).*`+ + refRow+eol, + &sqltypes.Result{}, + ) + + env.tmc.expectVRQuery(200, "update _vt.vreplication set state='Running' where db_name='vt_ks'", &sqltypes.Result{}) + env.tmc.expectVRQuery(210, "update _vt.vreplication set state='Running' where db_name='vt_ks'", &sqltypes.Result{}) + + err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true) + assert.NoError(t, err) + env.tmc.verifyQueries(t) +} + +// TestResharderNoRefStream tests the case where there's a stream, but it's not a reference. +func TestResharderNoRefStream(t *testing.T) { + env := newTestResharderEnv([]string{"0"}, []string{"-80", "80-"}) + defer env.close() + + schm := &tabletmanagerdatapb.SchemaDefinition{ + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{ + Name: "t1", + Columns: []string{"c1", "c2"}, + PrimaryKeyColumns: []string{"c1"}, + Fields: sqltypes.MakeTestFields("c1|c2", "int64|int64"), + }}, + } + env.tmc.schema = schm + + vs := &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "hash": { + Type: "hash", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Column: "c1", + Name: "hash", + }}, + }, + }, + } + if err := env.wr.ts.SaveVSchema(context.Background(), env.keyspace, vs); err != nil { + t.Fatal(err) + } + + env.expectValidation() + + bls := &binlogdatapb.BinlogSource{ + Keyspace: "ks1", + Shard: "0", + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + Filter: "select * from t2", + }}, + }, + } + result := sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "workflow|source|cell|tablet_types", + "varchar|varchar|varchar|varchar"), + fmt.Sprintf("t1|%v|cell1|master,replica", bls), + ) + env.tmc.expectVRQuery(100, fmt.Sprintf("select workflow, source, cell, tablet_types from _vt.vreplication where db_name='vt_%s'", env.keyspace), result) + env.tmc.expectVRQuery( 200, insertPrefix+ - `\('resharderTest', 'keyspace:\\"ks\\" shard:\\"0\\" filter: > ', '', [0-9]*, [0-9]*, '', '', [0-9]*, 0, 'Stopped', 'vt_ks'\)`, + `\('resharderTest', 'keyspace:\\"ks\\" shard:\\"0\\" filter: > ', '', [0-9]*, [0-9]*, '', '', [0-9]*, 0, 'Stopped', 'vt_ks'\)`+ + eol, &sqltypes.Result{}, ) env.tmc.expectVRQuery( 210, insertPrefix+ - `\('resharderTest', 'keyspace:\\"ks\\" shard:\\"0\\" filter: > ', '', [0-9]*, [0-9]*, '', '', [0-9]*, 0, 'Stopped', 'vt_ks'\)`, + `\('resharderTest', 'keyspace:\\"ks\\" shard:\\"0\\" filter: > ', '', [0-9]*, [0-9]*, '', '', [0-9]*, 0, 'Stopped', 'vt_ks'\)`+ + eol, &sqltypes.Result{}, ) @@ -61,3 +344,425 @@ func TestResharderOneToMany(t *testing.T) { assert.NoError(t, err) env.tmc.verifyQueries(t) } + +func TestResharderCopySchema(t *testing.T) { + env := newTestResharderEnv([]string{"0"}, []string{"-80", "80-"}) + defer env.close() + + schm := &tabletmanagerdatapb.SchemaDefinition{ + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{ + Name: "t1", + Columns: []string{"c1", "c2"}, + PrimaryKeyColumns: []string{"c1"}, + Fields: sqltypes.MakeTestFields("c1|c2", "int64|int64"), + }}, + } + env.tmc.schema = schm + + env.expectValidation() + env.expectNoRefStream() + + // These queries confirm that the copy schema function is getting called. + env.tmc.expectVRQuery(100, "SELECT 1 FROM information_schema.tables WHERE table_schema = '_vt' AND table_name = 'shard_metadata'", &sqltypes.Result{}) + env.tmc.expectVRQuery(100, "SELECT 1 FROM information_schema.tables WHERE table_schema = '_vt' AND table_name = 'shard_metadata'", &sqltypes.Result{}) + + env.tmc.expectVRQuery( + 200, + insertPrefix+ + `\('resharderTest', 'keyspace:\\"ks\\" shard:\\"0\\" filter: > ', '', [0-9]*, [0-9]*, '', '', [0-9]*, 0, 'Stopped', 'vt_ks'\)`+ + eol, + &sqltypes.Result{}, + ) + env.tmc.expectVRQuery( + 210, + insertPrefix+ + `\('resharderTest', 'keyspace:\\"ks\\" shard:\\"0\\" filter: > ', '', [0-9]*, [0-9]*, '', '', [0-9]*, 0, 'Stopped', 'vt_ks'\)`+ + eol, + &sqltypes.Result{}, + ) + + env.tmc.expectVRQuery(200, "update _vt.vreplication set state='Running' where db_name='vt_ks'", &sqltypes.Result{}) + env.tmc.expectVRQuery(210, "update _vt.vreplication set state='Running' where db_name='vt_ks'", &sqltypes.Result{}) + + err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, false) + assert.NoError(t, err) + env.tmc.verifyQueries(t) +} + +func TestResharderDupWorkflow(t *testing.T) { + env := newTestResharderEnv([]string{"0"}, []string{"-80", "80-"}) + defer env.close() + + schm := &tabletmanagerdatapb.SchemaDefinition{ + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{ + Name: "t1", + Columns: []string{"c1", "c2"}, + PrimaryKeyColumns: []string{"c1"}, + Fields: sqltypes.MakeTestFields("c1|c2", "int64|int64"), + }}, + } + env.tmc.schema = schm + + env.tmc.expectVRQuery(100, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s' and workflow='%s'", env.keyspace, env.workflow), &sqltypes.Result{}) + env.tmc.expectVRQuery(200, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s' and workflow='%s'", env.keyspace, env.workflow), &sqltypes.Result{}) + result := sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "1", + "int64"), + "1", + ) + env.tmc.expectVRQuery(210, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s' and workflow='%s'", env.keyspace, env.workflow), result) + + err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true) + assert.EqualError(t, err, "workflow resharderTest already exists in keyspace ks") + env.tmc.verifyQueries(t) +} + +func TestResharderServingState(t *testing.T) { + env := newTestResharderEnv([]string{"0"}, []string{"-80", "80-"}) + defer env.close() + + schm := &tabletmanagerdatapb.SchemaDefinition{ + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{ + Name: "t1", + Columns: []string{"c1", "c2"}, + PrimaryKeyColumns: []string{"c1"}, + Fields: sqltypes.MakeTestFields("c1|c2", "int64|int64"), + }}, + } + env.tmc.schema = schm + + env.tmc.expectVRQuery(100, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s' and workflow='%s'", env.keyspace, env.workflow), &sqltypes.Result{}) + env.tmc.expectVRQuery(200, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s' and workflow='%s'", env.keyspace, env.workflow), &sqltypes.Result{}) + env.tmc.expectVRQuery(210, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s' and workflow='%s'", env.keyspace, env.workflow), &sqltypes.Result{}) + err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, []string{"-80"}, nil, true) + assert.EqualError(t, err, "buildResharder: source shard -80 is not in serving state") + + env.tmc.expectVRQuery(100, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s' and workflow='%s'", env.keyspace, env.workflow), &sqltypes.Result{}) + env.tmc.expectVRQuery(200, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s' and workflow='%s'", env.keyspace, env.workflow), &sqltypes.Result{}) + env.tmc.expectVRQuery(210, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s' and workflow='%s'", env.keyspace, env.workflow), &sqltypes.Result{}) + err = env.wr.Reshard(context.Background(), env.keyspace, env.workflow, []string{"0"}, []string{"0"}, true) + assert.EqualError(t, err, "buildResharder: target shard 0 is in serving state") + + env.tmc.expectVRQuery(100, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s' and workflow='%s'", env.keyspace, env.workflow), &sqltypes.Result{}) + env.tmc.expectVRQuery(200, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s' and workflow='%s'", env.keyspace, env.workflow), &sqltypes.Result{}) + env.tmc.expectVRQuery(210, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s' and workflow='%s'", env.keyspace, env.workflow), &sqltypes.Result{}) + err = env.wr.Reshard(context.Background(), env.keyspace, env.workflow, []string{"0"}, []string{"-80"}, true) + assert.EqualError(t, err, "buildResharder: ValidateForReshard: source and target keyranges don't match: - vs -80") +} + +func TestResharderTargetAlreadyResharding(t *testing.T) { + env := newTestResharderEnv([]string{"0"}, []string{"-80", "80-"}) + defer env.close() + + schm := &tabletmanagerdatapb.SchemaDefinition{ + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{ + Name: "t1", + Columns: []string{"c1", "c2"}, + PrimaryKeyColumns: []string{"c1"}, + Fields: sqltypes.MakeTestFields("c1|c2", "int64|int64"), + }}, + } + env.tmc.schema = schm + + env.tmc.expectVRQuery(100, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s' and workflow='%s'", env.keyspace, env.workflow), &sqltypes.Result{}) + env.tmc.expectVRQuery(200, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s' and workflow='%s'", env.keyspace, env.workflow), &sqltypes.Result{}) + env.tmc.expectVRQuery(210, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s' and workflow='%s'", env.keyspace, env.workflow), &sqltypes.Result{}) + + result := sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "1", + "int64"), + "1", + ) + env.tmc.expectVRQuery(200, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s'", env.keyspace), result) + env.tmc.expectVRQuery(210, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s'", env.keyspace), &sqltypes.Result{}) + + err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true) + assert.EqualError(t, err, "buildResharder: validateTargets: some streams already exist in the target shards, please clean them up and retry the command") + env.tmc.verifyQueries(t) +} + +func TestResharderUnnamedStream(t *testing.T) { + env := newTestResharderEnv([]string{"0"}, []string{"-80", "80-"}) + defer env.close() + + schm := &tabletmanagerdatapb.SchemaDefinition{ + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{ + Name: "t1", + Columns: []string{"c1", "c2"}, + PrimaryKeyColumns: []string{"c1"}, + Fields: sqltypes.MakeTestFields("c1|c2", "int64|int64"), + }}, + } + env.tmc.schema = schm + + vs := &vschemapb.Keyspace{ + Tables: map[string]*vschemapb.Table{ + "t1": { + Type: vindexes.TypeReference, + }, + }, + } + if err := env.wr.ts.SaveVSchema(context.Background(), env.keyspace, vs); err != nil { + t.Fatal(err) + } + + env.expectValidation() + + bls := &binlogdatapb.BinlogSource{ + Keyspace: "ks1", + Shard: "0", + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + }}, + }, + } + result := sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "workflow|source|cell|tablet_types", + "varchar|varchar|varchar|varchar"), + fmt.Sprintf("|%v|cell1|master,replica", bls), + ) + env.tmc.expectVRQuery(100, fmt.Sprintf("select workflow, source, cell, tablet_types from _vt.vreplication where db_name='vt_%s'", env.keyspace), result) + + err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true) + assert.EqualError(t, err, "buildResharder: readRefStreams: VReplication streams must have named workflows for migration: shard: ks:0") + env.tmc.verifyQueries(t) +} + +func TestResharderMismatchedRefStreams(t *testing.T) { + env := newTestResharderEnv([]string{"-80", "80-"}, []string{"0"}) + defer env.close() + + schm := &tabletmanagerdatapb.SchemaDefinition{ + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{ + Name: "t1", + Columns: []string{"c1", "c2"}, + PrimaryKeyColumns: []string{"c1"}, + Fields: sqltypes.MakeTestFields("c1|c2", "int64|int64"), + }}, + } + env.tmc.schema = schm + + vs := &vschemapb.Keyspace{ + Tables: map[string]*vschemapb.Table{ + "t1": { + Type: vindexes.TypeReference, + }, + }, + } + if err := env.wr.ts.SaveVSchema(context.Background(), env.keyspace, vs); err != nil { + t.Fatal(err) + } + + env.expectValidation() + + bls1 := &binlogdatapb.BinlogSource{ + Keyspace: "ks1", + Shard: "0", + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + }}, + }, + } + result1 := sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "workflow|source|cell|tablet_types", + "varchar|varchar|varchar|varchar"), + fmt.Sprintf("t1|%v|cell1|master,replica", bls1), + ) + env.tmc.expectVRQuery(100, fmt.Sprintf("select workflow, source, cell, tablet_types from _vt.vreplication where db_name='vt_%s'", env.keyspace), result1) + bls2 := &binlogdatapb.BinlogSource{ + Keyspace: "ks2", + Shard: "0", + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + }}, + }, + } + result2 := sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "workflow|source|cell|tablet_types", + "varchar|varchar|varchar|varchar"), + fmt.Sprintf("t1|%v|cell1|master,replica", bls1), + fmt.Sprintf("t1|%v|cell1|master,replica", bls2), + ) + env.tmc.expectVRQuery(110, fmt.Sprintf("select workflow, source, cell, tablet_types from _vt.vreplication where db_name='vt_%s'", env.keyspace), result2) + + err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true) + want := "buildResharder: readRefStreams: streams are mismatched across source shards" + if err == nil || !strings.HasPrefix(err.Error(), want) { + t.Errorf("Reshard err: %v, want %v", err, want) + } + env.tmc.verifyQueries(t) +} + +func TestResharderTableNotInVSchema(t *testing.T) { + env := newTestResharderEnv([]string{"0"}, []string{"-80", "80-"}) + defer env.close() + + schm := &tabletmanagerdatapb.SchemaDefinition{ + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{ + Name: "t1", + Columns: []string{"c1", "c2"}, + PrimaryKeyColumns: []string{"c1"}, + Fields: sqltypes.MakeTestFields("c1|c2", "int64|int64"), + }}, + } + env.tmc.schema = schm + + env.expectValidation() + + bls := &binlogdatapb.BinlogSource{ + Keyspace: "ks1", + Shard: "0", + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + }}, + }, + } + result := sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "workflow|source|cell|tablet_types", + "varchar|varchar|varchar|varchar"), + fmt.Sprintf("t1|%v|cell1|master,replica", bls), + ) + env.tmc.expectVRQuery(100, fmt.Sprintf("select workflow, source, cell, tablet_types from _vt.vreplication where db_name='vt_%s'", env.keyspace), result) + + err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true) + assert.EqualError(t, err, "buildResharder: readRefStreams: blsIsReference: table t1 not found in vschema") + env.tmc.verifyQueries(t) +} + +func TestResharderMixedTablesOrder1(t *testing.T) { + env := newTestResharderEnv([]string{"0"}, []string{"-80", "80-"}) + defer env.close() + + schm := &tabletmanagerdatapb.SchemaDefinition{ + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{ + Name: "t1", + Columns: []string{"c1", "c2"}, + PrimaryKeyColumns: []string{"c1"}, + Fields: sqltypes.MakeTestFields("c1|c2", "int64|int64"), + }}, + } + env.tmc.schema = schm + + vs := &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "hash": { + Type: "hash", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Column: "c1", + Name: "hash", + }}, + }, + "t2": { + Type: vindexes.TypeReference, + }, + }, + } + if err := env.wr.ts.SaveVSchema(context.Background(), env.keyspace, vs); err != nil { + t.Fatal(err) + } + + env.expectValidation() + + bls := &binlogdatapb.BinlogSource{ + Keyspace: "ks1", + Shard: "0", + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + Filter: "select * from t2", + }, { + Match: "t2", + Filter: "select * from t2", + }}, + }, + } + result := sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "workflow|source|cell|tablet_types", + "varchar|varchar|varchar|varchar"), + fmt.Sprintf("t1t2|%v|cell1|master,replica", bls), + ) + env.tmc.expectVRQuery(100, fmt.Sprintf("select workflow, source, cell, tablet_types from _vt.vreplication where db_name='vt_%s'", env.keyspace), result) + + err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true) + want := "buildResharder: readRefStreams: blsIsReference: cannot reshard streams with a mix of reference and sharded tables" + if err == nil || !strings.HasPrefix(err.Error(), want) { + t.Errorf("Reshard err: %v, want %v", err.Error(), want) + } + env.tmc.verifyQueries(t) +} + +func TestResharderMixedTablesOrder2(t *testing.T) { + env := newTestResharderEnv([]string{"0"}, []string{"-80", "80-"}) + defer env.close() + + schm := &tabletmanagerdatapb.SchemaDefinition{ + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{ + Name: "t1", + Columns: []string{"c1", "c2"}, + PrimaryKeyColumns: []string{"c1"}, + Fields: sqltypes.MakeTestFields("c1|c2", "int64|int64"), + }}, + } + env.tmc.schema = schm + + vs := &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "hash": { + Type: "hash", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Column: "c1", + Name: "hash", + }}, + }, + "t2": { + Type: vindexes.TypeReference, + }, + }, + } + if err := env.wr.ts.SaveVSchema(context.Background(), env.keyspace, vs); err != nil { + t.Fatal(err) + } + + env.expectValidation() + + bls := &binlogdatapb.BinlogSource{ + Keyspace: "ks1", + Shard: "0", + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t2", + Filter: "select * from t2", + }, { + Match: "t1", + Filter: "select * from t2", + }}, + }, + } + result := sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "workflow|source|cell|tablet_types", + "varchar|varchar|varchar|varchar"), + fmt.Sprintf("t1t2|%v|cell1|master,replica", bls), + ) + env.tmc.expectVRQuery(100, fmt.Sprintf("select workflow, source, cell, tablet_types from _vt.vreplication where db_name='vt_%s'", env.keyspace), result) + + err := env.wr.Reshard(context.Background(), env.keyspace, env.workflow, env.sources, env.targets, true) + want := "buildResharder: readRefStreams: blsIsReference: cannot reshard streams with a mix of reference and sharded tables" + if err == nil || !strings.HasPrefix(err.Error(), want) { + t.Errorf("Reshard err: %v, want %v", err.Error(), want) + } + env.tmc.verifyQueries(t) +} From c63cebcab188f02bf5e58b1b8105840c022928bf Mon Sep 17 00:00:00 2001 From: Sugu Sougoumarane Date: Mon, 25 Nov 2019 18:11:25 -0800 Subject: [PATCH 09/10] vreplication: reshard: fix broken test Signed-off-by: Sugu Sougoumarane --- go/vt/wrangler/keyspace.go | 4 ++++ go/vt/wrangler/resharder_env_test.go | 9 +++++++++ 2 files changed, 13 insertions(+) diff --git a/go/vt/wrangler/keyspace.go b/go/vt/wrangler/keyspace.go index 04dd6bdbaef..7a53014f464 100644 --- a/go/vt/wrangler/keyspace.go +++ b/go/vt/wrangler/keyspace.go @@ -100,6 +100,10 @@ func (wr *Wrangler) validateNewWorkflow(ctx context.Context, keyspace, workflow var wg sync.WaitGroup allErrors := &concurrency.AllErrorRecorder{} for _, si := range allshards { + if si.MasterAlias == nil { + allErrors.RecordError(fmt.Errorf("shard has no master: %v", si)) + continue + } wg.Add(1) go func(si *topo.ShardInfo) { defer wg.Done() diff --git a/go/vt/wrangler/resharder_env_test.go b/go/vt/wrangler/resharder_env_test.go index 569783ba6f0..dfdb156cc33 100644 --- a/go/vt/wrangler/resharder_env_test.go +++ b/go/vt/wrangler/resharder_env_test.go @@ -121,6 +121,15 @@ func (env *testResharderEnv) addTablet(id int, keyspace, shard string, tabletTyp if err := env.wr.InitTablet(context.Background(), tablet, false /* allowMasterOverride */, true /* createShardAndKeyspace */, false /* allowUpdate */); err != nil { panic(err) } + if tabletType == topodatapb.TabletType_MASTER { + _, err := env.wr.ts.UpdateShardFields(context.Background(), keyspace, shard, func(si *topo.ShardInfo) error { + si.MasterAlias = tablet.Alias + return nil + }) + if err != nil { + panic(err) + } + } return tablet } From 09e34ab52572684dfd28f5d499960edfbc9998a2 Mon Sep 17 00:00:00 2001 From: Sugu Sougoumarane Date: Mon, 2 Dec 2019 12:42:50 -0800 Subject: [PATCH 10/10] topo: fix obsolete/broken tests Signed-off-by: Sugu Sougoumarane --- test/config.json | 11 -- test/custom_sharding.py | 298 ---------------------------------------- test/keyspace_test.py | 18 +-- 3 files changed, 2 insertions(+), 325 deletions(-) delete mode 100755 test/custom_sharding.py diff --git a/test/config.json b/test/config.json index 273fa24bd1a..a8648f96063 100644 --- a/test/config.json +++ b/test/config.json @@ -98,17 +98,6 @@ "RetryMax": 1, "Tags": [] }, - "custom_sharding": { - "File": "custom_sharding.py", - "Args": [], - "Command": [], - "Manual": false, - "Shard": 4, - "RetryMax": 0, - "Tags": [ - "worker_test" - ] - }, "encrypted_replication": { "File": "encrypted_replication.py", "Args": [], diff --git a/test/custom_sharding.py b/test/custom_sharding.py deleted file mode 100755 index a3197cbb2e5..00000000000 --- a/test/custom_sharding.py +++ /dev/null @@ -1,298 +0,0 @@ -#!/usr/bin/env python -# -# Copyright 2019 The Vitess Authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import base64 -import unittest - -import environment -import tablet -import utils - -from vtproto import topodata_pb2 - -from vtdb import vtgate_client - -# shards need at least 1 replica for semi-sync ACK, and 1 rdonly for SplitQuery. -shard_0_master = tablet.Tablet() -shard_0_replica = tablet.Tablet() -shard_0_rdonly = tablet.Tablet() - -shard_1_master = tablet.Tablet() -shard_1_replica = tablet.Tablet() -shard_1_rdonly = tablet.Tablet() - -all_tablets = [shard_0_master, shard_0_replica, shard_0_rdonly, - shard_1_master, shard_1_replica, shard_1_rdonly] - - -def setUpModule(): - try: - environment.topo_server().setup() - - setup_procs = [t.init_mysql() for t in all_tablets] - utils.Vtctld().start() - utils.wait_procs(setup_procs) - except: - tearDownModule() - raise - - -def tearDownModule(): - utils.required_teardown() - if utils.options.skip_teardown: - return - - teardown_procs = [t.teardown_mysql() for t in all_tablets] - utils.wait_procs(teardown_procs, raise_on_error=False) - - environment.topo_server().teardown() - utils.kill_sub_processes() - utils.remove_tmp_files() - - for t in all_tablets: - t.remove_tree() - - -class TestCustomSharding(unittest.TestCase): - """Test a custom-shared keyspace.""" - - def _vtdb_conn(self): - protocol, addr = utils.vtgate.rpc_endpoint(python=True) - return vtgate_client.connect(protocol, addr, 30.0) - - def _insert_data(self, shard, start, count, table='data'): - sql = 'insert into ' + table + '(id, name) values (:id, :name)' - conn = self._vtdb_conn() - cursor = conn.cursor( - tablet_type='master', keyspace='test_keyspace', - shards=[shard], - writable=True) - for x in xrange(count): - bindvars = { - 'id': start+x, - 'name': 'row %d' % (start+x), - } - conn.begin() - cursor.execute(sql, bindvars) - conn.commit() - conn.close() - - def _check_data(self, shard, start, count, table='data'): - sql = 'select name from ' + table + ' where id=:id' - conn = self._vtdb_conn() - cursor = conn.cursor( - tablet_type='master', keyspace='test_keyspace', - shards=[shard]) - for x in xrange(count): - bindvars = { - 'id': start+x, - } - cursor.execute(sql, bindvars) - qr = cursor.fetchall() - self.assertEqual(len(qr), 1) - v = qr[0][0] - self.assertEqual(v, 'row %d' % (start+x)) - conn.close() - - def test_custom_end_to_end(self): - """Runs through the common operations of a custom sharded keyspace. - - Tests creation with one shard, schema change, reading / writing - data, adding one more shard, reading / writing data from both - shards, applying schema changes again, and reading / writing data - from both shards again. - """ - - utils.run_vtctl(['CreateKeyspace', 'test_keyspace']) - - # start the first shard only for now - shard_0_master.init_tablet( - 'replica', - keyspace='test_keyspace', - shard='0', - tablet_index=0) - shard_0_replica.init_tablet( - 'replica', - keyspace='test_keyspace', - shard='0', - tablet_index=1) - shard_0_rdonly.init_tablet( - 'rdonly', - keyspace='test_keyspace', - shard='0', - tablet_index=2) - - for t in [shard_0_master, shard_0_replica, shard_0_rdonly]: - t.create_db('vt_test_keyspace') - t.start_vttablet(wait_for_state=None) - - for t in [shard_0_master, shard_0_replica, shard_0_rdonly]: - t.wait_for_vttablet_state('NOT_SERVING') - - utils.run_vtctl(['InitShardMaster', '-force', 'test_keyspace/0', - shard_0_master.tablet_alias], auto_log=True) - utils.wait_for_tablet_type(shard_0_replica.tablet_alias, 'replica') - utils.wait_for_tablet_type(shard_0_rdonly.tablet_alias, 'rdonly') - for t in [shard_0_master, shard_0_replica, shard_0_rdonly]: - t.wait_for_vttablet_state('SERVING') - - self._check_shards_count_in_srv_keyspace(1) - s = utils.run_vtctl_json(['GetShard', 'test_keyspace/0']) - self.assertEqual(s['is_master_serving'], True) - - # create a table on shard 0 - sql = '''create table data( -id bigint auto_increment, -name varchar(64), -primary key (id) -) Engine=InnoDB''' - utils.run_vtctl(['ApplySchema', '-sql=' + sql, 'test_keyspace'], - auto_log=True) - - # reload schema everywhere so the QueryService knows about the tables - for t in [shard_0_master, shard_0_replica, shard_0_rdonly]: - utils.run_vtctl(['ReloadSchema', t.tablet_alias], auto_log=True) - - # create shard 1 - shard_1_master.init_tablet( - 'replica', - keyspace='test_keyspace', - shard='1', - tablet_index=0) - shard_1_replica.init_tablet( - 'replica', - keyspace='test_keyspace', - shard='1', - tablet_index=1) - shard_1_rdonly.init_tablet( - 'rdonly', - keyspace='test_keyspace', - shard='1', - tablet_index=2) - - for t in [shard_1_master, shard_1_replica, shard_1_rdonly]: - t.create_db('vt_test_keyspace') - t.start_vttablet(wait_for_state=None) - - for t in [shard_1_master, shard_1_replica, shard_1_rdonly]: - t.wait_for_vttablet_state('NOT_SERVING') - - s = utils.run_vtctl_json(['GetShard', 'test_keyspace/1']) - self.assertEqual(s['is_master_serving'], True) - - utils.run_vtctl(['InitShardMaster', '-force', 'test_keyspace/1', - shard_1_master.tablet_alias], auto_log=True) - utils.wait_for_tablet_type(shard_1_replica.tablet_alias, 'replica') - utils.wait_for_tablet_type(shard_1_rdonly.tablet_alias, 'rdonly') - for t in [shard_1_master, shard_1_replica, shard_1_rdonly]: - t.wait_for_vttablet_state('SERVING') - utils.run_vtctl(['CopySchemaShard', shard_0_rdonly.tablet_alias, - 'test_keyspace/1'], auto_log=True) - - # we need to rebuild SrvKeyspace here to account for the new shards. - utils.run_vtctl(['RebuildKeyspaceGraph', 'test_keyspace'], auto_log=True) - self._check_shards_count_in_srv_keyspace(2) - - # must start vtgate after tablets are up, or else wait until 1min refresh - utils.VtGate().start(tablets=[ - shard_0_master, shard_0_replica, shard_0_rdonly, - shard_1_master, shard_1_replica, shard_1_rdonly]) - utils.vtgate.wait_for_endpoints('test_keyspace.0.master', 1) - utils.vtgate.wait_for_endpoints('test_keyspace.0.replica', 1) - utils.vtgate.wait_for_endpoints('test_keyspace.0.rdonly', 1) - utils.vtgate.wait_for_endpoints('test_keyspace.1.master', 1) - utils.vtgate.wait_for_endpoints('test_keyspace.1.replica', 1) - utils.vtgate.wait_for_endpoints('test_keyspace.1.rdonly', 1) - - # insert and check data on shard 0 - self._insert_data('0', 100, 10) - self._check_data('0', 100, 10) - - # insert and check data on shard 1 - self._insert_data('1', 200, 10) - self._check_data('1', 200, 10) - - # create a second table on all shards - sql = '''create table data2( -id bigint auto_increment, -name varchar(64), -primary key (id) -) Engine=InnoDB''' - utils.run_vtctl(['ApplySchema', '-sql=' + sql, 'test_keyspace'], - auto_log=True) - - # reload schema everywhere so the QueryService knows about the tables - for t in all_tablets: - utils.run_vtctl(['ReloadSchema', t.tablet_alias], auto_log=True) - - # insert and read data on all shards - self._insert_data('0', 300, 10, table='data2') - self._insert_data('1', 400, 10, table='data2') - self._check_data('0', 300, 10, table='data2') - self._check_data('1', 400, 10, table='data2') - - # Now test SplitQuery API works (used in MapReduce usually, but bringing - # up a full MR-capable cluster is too much for this test environment) - sql = 'select id, name from data' - s = utils.vtgate.split_query(sql, 'test_keyspace', 4) - self.assertEqual(len(s), 4) - shard0count = 0 - shard1count = 0 - for q in s: - if q['shard_part']['shards'][0] == '0': - shard0count += 1 - if q['shard_part']['shards'][0] == '1': - shard1count += 1 - self.assertEqual(shard0count, 2) - self.assertEqual(shard1count, 2) - - # run the queries, aggregate the results, make sure we have all rows - rows = {} - for q in s: - bindvars = {} - for name, value in q['query']['bind_variables'].iteritems(): - # vtctl encodes bytes as base64. - bindvars[name] = int(base64.standard_b64decode(value['value'])) - qr = utils.vtgate.execute_shards( - q['query']['sql'], - 'test_keyspace', ','.join(q['shard_part']['shards']), - tablet_type='master', bindvars=bindvars) - for r in qr['rows']: - rows[int(r[0])] = r[1] - self.assertEqual(len(rows), 20) - expected = {} - for i in xrange(10): - expected[100 + i] = 'row %d' % (100 + i) - expected[200 + i] = 'row %d' % (200 + i) - self.assertEqual(rows, expected) - - def _check_shards_count_in_srv_keyspace(self, shard_count): - ks = utils.run_vtctl_json(['GetSrvKeyspace', 'test_nj', 'test_keyspace']) - check_types = set([topodata_pb2.MASTER, topodata_pb2.REPLICA, - topodata_pb2.RDONLY]) - for p in ks['partitions']: - if p['served_type'] in check_types: - self.assertEqual(len(p['shard_references']), shard_count) - check_types.remove(p['served_type']) - - self.assertEqual(len(check_types), 0, - 'The number of expected shard_references in GetSrvKeyspace' - ' was not equal %d for all expected tablet types.' - % shard_count) - - -if __name__ == '__main__': - utils.main() diff --git a/test/keyspace_test.py b/test/keyspace_test.py index bc35027adc2..4fedac8f4ef 100755 --- a/test/keyspace_test.py +++ b/test/keyspace_test.py @@ -301,23 +301,15 @@ def test_remove_keyspace_cell(self): utils.run_vtctl( ['InitTablet', '-port=1234', '-keyspace=test_delete_keyspace', '-shard=0', 'test_ca-0000000100', 'master']) - utils.run_vtctl( - ['InitTablet', '-port=1234', '-keyspace=test_delete_keyspace', - '-shard=1', 'test_ca-0000000101', 'master']) utils.run_vtctl( ['InitTablet', '-port=1234', '-keyspace=test_delete_keyspace', '-shard=0', 'test_nj-0000000100', 'replica']) - utils.run_vtctl( - ['InitTablet', '-port=1234', '-keyspace=test_delete_keyspace', - '-shard=1', 'test_nj-0000000101', 'replica']) # Create the serving/replication entries and check that they exist, # so we can later check they're deleted. utils.run_vtctl(['RebuildKeyspaceGraph', 'test_delete_keyspace']) utils.run_vtctl( ['GetShardReplication', 'test_nj', 'test_delete_keyspace/0']) - utils.run_vtctl( - ['GetShardReplication', 'test_nj', 'test_delete_keyspace/1']) utils.run_vtctl(['GetSrvKeyspace', 'test_nj', 'test_delete_keyspace']) utils.run_vtctl(['GetSrvKeyspace', 'test_ca', 'test_delete_keyspace']) @@ -328,13 +320,13 @@ def test_remove_keyspace_cell(self): # Check that the shard is gone from test_nj. srv_keyspace = utils.run_vtctl_json(['GetSrvKeyspace', 'test_nj', 'test_delete_keyspace']) for partition in srv_keyspace['partitions']: - self.assertEqual(len(partition['shard_references']), 1, + self.assertEqual(len(partition['shard_references']), 0, 'RemoveShardCell should have removed one shard from the target cell: ' + json.dumps(srv_keyspace)) # Make sure the shard is still serving in test_ca. srv_keyspace = utils.run_vtctl_json(['GetSrvKeyspace', 'test_ca', 'test_delete_keyspace']) for partition in srv_keyspace['partitions']: - self.assertEqual(len(partition['shard_references']), 2, + self.assertEqual(len(partition['shard_references']), 1, 'RemoveShardCell should not have changed other cells: ' + json.dumps(srv_keyspace)) utils.run_vtctl(['RebuildKeyspaceGraph', 'test_delete_keyspace']) @@ -343,14 +335,11 @@ def test_remove_keyspace_cell(self): utils.run_vtctl(['GetShard', 'test_delete_keyspace/0']) utils.run_vtctl(['GetTablet', 'test_ca-0000000100']) utils.run_vtctl(['GetTablet', 'test_nj-0000000100'], expect_fail=True) - utils.run_vtctl(['GetTablet', 'test_nj-0000000101']) utils.run_vtctl( ['GetShardReplication', 'test_ca', 'test_delete_keyspace/0']) utils.run_vtctl( ['GetShardReplication', 'test_nj', 'test_delete_keyspace/0'], expect_fail=True) - utils.run_vtctl( - ['GetShardReplication', 'test_nj', 'test_delete_keyspace/1']) utils.run_vtctl(['GetSrvKeyspace', 'test_nj', 'test_delete_keyspace']) # Add it back to do another test. @@ -372,9 +361,6 @@ def test_remove_keyspace_cell(self): utils.run_vtctl( ['GetShardReplication', 'test_nj', 'test_delete_keyspace/0'], expect_fail=True) - utils.run_vtctl( - ['GetShardReplication', 'test_nj', 'test_delete_keyspace/1'], - expect_fail=True) # Clean up. utils.run_vtctl(['DeleteKeyspace', '-recursive', 'test_delete_keyspace'])