Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions go/vt/topo/cell_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package topo

import (
"path"
"strings"

"context"

Expand Down Expand Up @@ -171,3 +172,41 @@ func (ts *Server) GetKnownCells(ctx context.Context) ([]string, error) {
}
return DirEntriesToStringArray(entries), nil
}

// ExpandCells takes a comma-separated list of cells and returns an array of cell names
// Aliases are expanded and an empty string returns all cells
func (ts *Server) ExpandCells(ctx context.Context, cells string) ([]string, error) {
var err error
var outputCells []string
inputCells := strings.Split(cells, ",")
if cells == "" {
inputCells, err = ts.GetCellInfoNames(ctx)
if err != nil {
return nil, err
}
}

for _, cell := range inputCells {
cell2 := strings.TrimSpace(cell)
shortCtx, cancel := context.WithTimeout(ctx, *RemoteOperationTimeout)
defer cancel()
_, err := ts.GetCellInfo(shortCtx, cell2, false)
if err != nil {
// not a valid cell, check whether it is a cell alias
shortCtx, cancel := context.WithTimeout(ctx, *RemoteOperationTimeout)
defer cancel()
alias, err2 := ts.GetCellsAlias(shortCtx, cell2, false)
// if we get an error, either cellAlias doesn't exist or it isn't a cell alias at all. Ignore and continue
if err2 == nil {
outputCells = append(outputCells, alias.Cells...)
}
if err != nil {
return nil, err
}
} else {
// valid cell, add it to our list
outputCells = append(outputCells, cell2)
}
}
return outputCells, nil
}
19 changes: 19 additions & 0 deletions go/vt/topo/srv_keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -701,3 +701,22 @@ func ShardIsServing(srvKeyspace *topodatapb.SrvKeyspace, shard *topodatapb.Shard
}
return false
}

// ValidateSrvKeyspace validates that the SrvKeyspace for given keyspace in the provided cells is not corrupted
func (ts *Server) ValidateSrvKeyspace(ctx context.Context, keyspace, cells string) error {
cellsToValidate, err := ts.ExpandCells(ctx, cells)
if err != nil {
return err
}
for _, cell := range cellsToValidate {
srvKeyspace, err := ts.GetSrvKeyspace(ctx, cell, keyspace)
if err != nil {
return err
}
err = OrderAndCheckPartitions(cell, srvKeyspace)
if err != nil {
return err
}
}
return nil
}
53 changes: 53 additions & 0 deletions go/vt/topo/topotests/cell_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ package topotests

import (
"fmt"
"strings"
"testing"

"github.com/stretchr/testify/require"

"context"

"vitess.io/vitess/go/vt/topo"
Expand All @@ -44,6 +47,11 @@ func TestCellInfo(t *testing.T) {
t.Fatalf("unexpected CellInfo: %v", ci)
}

var cells []string
cells, err = ts.ExpandCells(ctx, cell)
require.NoError(t, err)
require.EqualValues(t, []string{"cell1"}, cells)

// Update the Server Address.
if err := ts.UpdateCellInfoFields(ctx, cell, func(ci *topodatapb.CellInfo) error {
ci.ServerAddress = "new address"
Expand Down Expand Up @@ -124,3 +132,48 @@ func TestCellInfo(t *testing.T) {
t.Fatalf("GetCellInfo(non-existing cell) failed: %v", err)
}
}

func TestExpandCells(t *testing.T) {
ctx := context.Background()
var cells []string
var err error
var allCells = "cell1,cell2,cell3"
type testCase struct {
name string
cellsIn string
cellsOut []string
errString string
}

testCases := []testCase{
{"single", "cell1", []string{"cell1"}, ""},
{"multiple", "cell1,cell2,cell3", []string{"cell1", "cell2", "cell3"}, ""},
{"empty", "", []string{"cell1", "cell2", "cell3"}, ""},
{"bad", "unknown", nil, "node doesn't exist"},
}

for _, tCase := range testCases {
t.Run(tCase.name, func(t *testing.T) {
cellsIn := tCase.cellsIn
if cellsIn == "" {
cellsIn = allCells
}
topoCells := strings.Split(cellsIn, ",")
var ts *topo.Server
if tCase.name == "bad" {
ts = memorytopo.NewServer()
} else {
ts = memorytopo.NewServer(topoCells...)
}
cells, err = ts.ExpandCells(ctx, cellsIn)
if tCase.errString != "" {
require.Error(t, err)
require.Contains(t, err.Error(), tCase.errString)
} else {
require.NoError(t, err)
}
require.EqualValues(t, tCase.cellsOut, cells)
})
}

}
70 changes: 70 additions & 0 deletions go/vt/topo/topotests/srv_keyspace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"

"context"

"github.com/golang/protobuf/proto"
Expand Down Expand Up @@ -1170,3 +1172,71 @@ func TestMasterMigrateServedType(t *testing.T) {
t.Errorf("MigrateServedType() failure. Got %v, want: %v", string(got), string(want))
}
}

func TestValidateSrvKeyspace(t *testing.T) {
cell := "cell1"
cell2 := "cell2"
keyspace := "ks1"
ctx := context.Background()
ts := memorytopo.NewServer(cell, cell2)

leftKeyRange, err := key.ParseShardingSpec("-80")
if err != nil || len(leftKeyRange) != 1 {
t.Fatalf("ParseShardingSpec failed. Expected non error and only one element. Got err: %v, len(%v)", err, len(leftKeyRange))
}

rightKeyRange, err := key.ParseShardingSpec("80-")
if err != nil || len(rightKeyRange) != 1 {
t.Fatalf("ParseShardingSpec failed. Expected non error and only one element. Got err: %v, len(%v)", err, len(rightKeyRange))
}

correct := &topodatapb.SrvKeyspace{
Partitions: []*topodatapb.SrvKeyspace_KeyspacePartition{
{
ServedType: topodatapb.TabletType_MASTER,
ShardReferences: []*topodatapb.ShardReference{
{
Name: "-80",
KeyRange: leftKeyRange[0],
},
{
Name: "80-",
KeyRange: rightKeyRange[0],
},
},
},
},
}

incorrect := &topodatapb.SrvKeyspace{
Partitions: []*topodatapb.SrvKeyspace_KeyspacePartition{
{
ServedType: topodatapb.TabletType_MASTER,
ShardReferences: []*topodatapb.ShardReference{
{
Name: "80-",
KeyRange: rightKeyRange[0],
},
},
},
},
}

if err := ts.UpdateSrvKeyspace(ctx, cell, keyspace, correct); err != nil {
t.Fatalf("UpdateSrvKeyspace() failed: %v", err)
}

if err := ts.UpdateSrvKeyspace(ctx, cell2, keyspace, incorrect); err != nil {
t.Fatalf("UpdateSrvKeyspace() failed: %v", err)
}
errMsg := "keyspace partition for MASTER in cell cell2 does not start with min key"
err = ts.ValidateSrvKeyspace(ctx, keyspace, "cell1,cell2")
require.EqualError(t, err, errMsg)

err = ts.ValidateSrvKeyspace(ctx, keyspace, "cell1")
require.NoError(t, err)
err = ts.ValidateSrvKeyspace(ctx, keyspace, "cell2")
require.EqualError(t, err, errMsg)
err = ts.ValidateSrvKeyspace(ctx, keyspace, "")
require.EqualError(t, err, errMsg)
}
9 changes: 8 additions & 1 deletion go/vt/wrangler/resharder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ limitations under the License.
package wrangler

import (
"context"
"fmt"
"sync"
"time"

"context"
"vitess.io/vitess/go/vt/log"

"github.com/golang/protobuf/proto"
"github.com/pkg/errors"
Expand Down Expand Up @@ -67,6 +68,12 @@ func (wr *Wrangler) Reshard(ctx context.Context, keyspace, workflow string, sour
if err := wr.validateNewWorkflow(ctx, keyspace, workflow); err != nil {
return err
}
if err := wr.ts.ValidateSrvKeyspace(ctx, keyspace, cell); err != nil {
err2 := vterrors.Wrapf(err, "SrvKeyspace for keyspace %s is corrupt in cell %s", keyspace, cell)
log.Errorf("%w", err2)
return err2
}

rs, err := wr.buildResharder(ctx, keyspace, workflow, sources, targets, cell, tabletTypes)
if err != nil {
return vterrors.Wrap(err, "buildResharder")
Expand Down
37 changes: 35 additions & 2 deletions go/vt/wrangler/resharder_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ import (
"sync"
"testing"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/vt/key"

"context"

"vitess.io/vitess/go/sqltypes"
Expand Down Expand Up @@ -55,7 +59,36 @@ var (
//----------------------------------------------
// testResharderEnv

func newTestResharderEnv(sources, targets []string) *testResharderEnv {
func getPartition(t *testing.T, shards []string) *topodatapb.SrvKeyspace_KeyspacePartition {
partition := &topodatapb.SrvKeyspace_KeyspacePartition{
ServedType: topodatapb.TabletType_MASTER,
ShardReferences: []*topodatapb.ShardReference{},
}
for _, shard := range shards {
keyRange, err := key.ParseShardingSpec(shard)
require.NoError(t, err)
require.Equal(t, 1, len(keyRange))
partition.ShardReferences = append(partition.ShardReferences, &topodatapb.ShardReference{
Name: shard,
KeyRange: keyRange[0],
})
}
return partition
}
func initTopo(t *testing.T, topo *topo.Server, keyspace string, sources, targets, cells []string) {
ctx := context.Background()
srvKeyspace := &topodatapb.SrvKeyspace{
Partitions: []*topodatapb.SrvKeyspace_KeyspacePartition{},
}
srvKeyspace.Partitions = append(srvKeyspace.Partitions, getPartition(t, sources))
srvKeyspace.Partitions = append(srvKeyspace.Partitions, getPartition(t, targets))
for _, cell := range cells {
topo.UpdateSrvKeyspace(ctx, cell, keyspace, srvKeyspace)
}
topo.ValidateSrvKeyspace(ctx, keyspace, strings.Join(cells, ","))
}

func newTestResharderEnv(t *testing.T, sources, targets []string) *testResharderEnv {
env := &testResharderEnv{
keyspace: "ks",
workflow: "resharderTest",
Expand All @@ -67,7 +100,7 @@ func newTestResharderEnv(sources, targets []string) *testResharderEnv {
tmc: newTestResharderTMClient(),
}
env.wr = New(logutil.NewConsoleLogger(), env.topoServ, env.tmc)

initTopo(t, env.topoServ, "ks", sources, targets, []string{"cell"})
tabletID := 100
for _, shard := range sources {
_ = env.addTablet(tabletID, env.keyspace, shard, topodatapb.TabletType_MASTER)
Expand Down
Loading