Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go/flags/endtoend/vtorc.txt
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ Flags:
--topo_global_root string the path of the global topology data in the global topology server
--topo_global_server_address string the address of the global topology server
--topo_implementation string the topology implementation to use
--topo_read_concurrency int Concurrency of topo reads. (default 32)
--topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be <scheme>:<auth>, e.g., digest:user:pass
--topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s)
--topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64)
Expand Down
1 change: 1 addition & 0 deletions go/vt/topo/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func init() {
servenv.OnParseFor("vtcombo", registerFlags)
servenv.OnParseFor("vtctld", registerFlags)
servenv.OnParseFor("vtgate", registerFlags)
servenv.OnParseFor("vtorc", registerFlags)
}

// KeyspaceInfo is a meta struct that contains metadata to give the
Expand Down
89 changes: 83 additions & 6 deletions go/vt/topo/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,20 @@ import (
"sync"
"time"

"vitess.io/vitess/go/constants/sidecar"
"vitess.io/vitess/go/protoutil"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
"golang.org/x/sync/semaphore"

"vitess.io/vitess/go/constants/sidecar"
"vitess.io/vitess/go/event"
"vitess.io/vitess/go/protoutil"
"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/log"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/topo/events"
"vitess.io/vitess/go/vt/topo/topoproto"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/vterrors"
)

const (
Expand Down Expand Up @@ -644,6 +644,83 @@ func (ts *Server) FindAllTabletAliasesInShardByCell(ctx context.Context, keyspac
return result, err
}

// GetTabletsByShard returns the tablets in the given shard using all cells.
// It can return ErrPartialResult if it couldn't read all the cells, or all
// the individual tablets, in which case the result is valid, but partial.
func (ts *Server) GetTabletsByShard(ctx context.Context, keyspace, shard string) ([]*TabletInfo, error) {
return ts.GetTabletsByShardCell(ctx, keyspace, shard, nil)
}

// GetTabletsByShardCell returns the tablets in the given shard. It can return
// ErrPartialResult if it couldn't read all the cells, or all the individual
// tablets, in which case the result is valid, but partial.
func (ts *Server) GetTabletsByShardCell(ctx context.Context, keyspace, shard string, cells []string) ([]*TabletInfo, error) {
span, ctx := trace.NewSpan(ctx, "topo.GetTabletsByShardCell")
span.Annotate("keyspace", keyspace)
span.Annotate("shard", shard)
span.Annotate("num_cells", len(cells))
defer span.Finish()
ctx = trace.NewContext(ctx, span)
var err error

if len(cells) == 0 {
cells, err = ts.GetCellInfoNames(ctx)
if err != nil {
return nil, err
}
}

// divide the concurrency limit by the number of cells. if there are more
// cells than the limit, default to concurrency of 1. A semaphore ensures
// the limit is not exceeded in this scenario.
var getConcurrency int
var sem *semaphore.Weighted
if len(cells) > DefaultConcurrency {
sem = semaphore.NewWeighted(int64(DefaultConcurrency))
getConcurrency = 1
} else {
getConcurrency = DefaultConcurrency / len(cells)
}

wg := sync.WaitGroup{}
mutex := sync.Mutex{}
rec := concurrency.AllErrorRecorder{}
tablets := make([]*TabletInfo, 0)
for _, cell := range cells {
wg.Add(1)
go func() {
if sem != nil {
if err := sem.Acquire(ctx, 1); err != nil {
rec.RecordError(vterrors.Wrap(err, fmt.Sprintf("GetTabletsByCell for %v failed.", cell)))
return
}
defer sem.Release(1)
}

t, err := ts.GetTabletsByCell(ctx, cell, &GetTabletsByCellOptions{
Concurrency: getConcurrency,
Keyspace: keyspace,
Shard: shard,
})
if err != nil {
rec.RecordError(vterrors.Wrap(err, fmt.Sprintf("GetTabletsByCell for %v failed.", cell)))
return
}
mutex.Lock()
tablets = append(tablets, t...)
mutex.Unlock()
}()
}
wg.Wait()
err = nil
if rec.HasErrors() {
log.Warningf("GetTabletsByShardCell(%v,%v): got partial result: %v", keyspace, shard, rec.Error())
err = NewError(PartialResult, shard)
}

return tablets, err
}

// GetTabletMapForShard returns the tablets for a shard. It can return
// ErrPartialResult if it couldn't read all the cells, or all
// the individual tablets, in which case the map is valid, but partial.
Expand Down
16 changes: 14 additions & 2 deletions go/vt/topo/tablet.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,10 @@ func (ts *Server) GetTabletAliasesByCell(ctx context.Context, cell string) ([]*t
type GetTabletsByCellOptions struct {
// Concurrency controls the maximum number of concurrent calls to GetTablet.
Concurrency int
// Keyspace is the optional keyspace tablets must match.
Keyspace string
// Shard is the optional shard tablets must match.
Shard string
}

// GetTabletsByCell returns all the tablets in the cell.
Expand Down Expand Up @@ -316,13 +320,21 @@ func (ts *Server) GetTabletsByCell(ctx context.Context, cellAlias string, opt *G
return nil, err
}

tablets := make([]*TabletInfo, len(listResults))
tablets := make([]*TabletInfo, 0)
for n := range listResults {
tablet := &topodatapb.Tablet{}
if err := tablet.UnmarshalVT(listResults[n].Value); err != nil {
return nil, err
}
tablets[n] = &TabletInfo{Tablet: tablet, version: listResults[n].Version}
if opt != nil && opt.Keyspace != "" {
if opt.Keyspace != tablet.Keyspace {
continue
}
if opt.Shard != "" && opt.Shard != tablet.Shard {
continue
}
}
tablets = append(tablets, &TabletInfo{Tablet: tablet, version: listResults[n].Version})
}

return tablets, nil
Expand Down
111 changes: 71 additions & 40 deletions go/vt/topo/tablet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,43 +34,63 @@ import (
// GetTabletsByCell first tries to get all the tablets using List.
// If the response is too large, we will get an error, and fall back to one tablet at a time.
func TestServerGetTabletsByCell(t *testing.T) {
const cell = "zone1"
const keyspace = "keyspace"
const shard = "shard"

tests := []struct {
name string
tablets int
opt *topo.GetTabletsByCellOptions
listError error
name string
createShardTablets int
opt *topo.GetTabletsByCellOptions
listError error
keyspaceShards map[string]string
}{
{
name: "negative concurrency",
tablets: 1,
name: "negative concurrency",
keyspaceShards: map[string]string{keyspace: shard},
createShardTablets: 1,
// Ensure this doesn't panic.
opt: &topo.GetTabletsByCellOptions{Concurrency: -1},
},
{
name: "single",
tablets: 1,
name: "single",
keyspaceShards: map[string]string{keyspace: shard},
createShardTablets: 1,
// Make sure the defaults apply as expected.
opt: nil,
},
{
name: "multiple",
name: "multiple",
keyspaceShards: map[string]string{keyspace: shard},
// should work with more than 1 tablet
tablets: 32,
opt: &topo.GetTabletsByCellOptions{Concurrency: 8},
createShardTablets: 32,
opt: &topo.GetTabletsByCellOptions{Concurrency: 8},
},
{
name: "multiple with list error",
name: "multiple with list error",
keyspaceShards: map[string]string{keyspace: shard},
// should work with more than 1 tablet when List returns an error
tablets: 32,
opt: &topo.GetTabletsByCellOptions{Concurrency: 8},
listError: topo.NewError(topo.ResourceExhausted, ""),
createShardTablets: 32,
opt: &topo.GetTabletsByCellOptions{Concurrency: 8},
listError: topo.NewError(topo.ResourceExhausted, ""),
},
{
name: "filtered by keyspace and shard",
keyspaceShards: map[string]string{
keyspace: shard,
"filtered": "-",
},
// should create 2 tablets in 2 different shards (4 total)
// but only a single shard is returned
createShardTablets: 2,
opt: &topo.GetTabletsByCellOptions{
Concurrency: 1,
Keyspace: keyspace,
Shard: shard,
},
},
}

const cell = "zone1"
const keyspace = "keyspace"
const shard = "shard"

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -84,38 +104,49 @@ func TestServerGetTabletsByCell(t *testing.T) {

// Create an ephemeral keyspace and generate shard records within
// the keyspace to fetch later.
require.NoError(t, ts.CreateKeyspace(ctx, keyspace, &topodatapb.Keyspace{}))
require.NoError(t, ts.CreateShard(ctx, keyspace, shard))

tablets := make([]*topo.TabletInfo, tt.tablets)

for i := 0; i < tt.tablets; i++ {
tablet := &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: cell,
Uid: uint32(i),
},
Hostname: "host1",
PortMap: map[string]int32{
"vt": int32(i),
},
Keyspace: keyspace,
Shard: shard,
for k, s := range tt.keyspaceShards {
require.NoError(t, ts.CreateKeyspace(ctx, k, &topodatapb.Keyspace{}))
require.NoError(t, ts.CreateShard(ctx, k, s))
}

tablets := make([]*topo.TabletInfo, tt.createShardTablets)

var uid uint32 = 1
for k, s := range tt.keyspaceShards {
for i := 0; i < tt.createShardTablets; i++ {
tablet := &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: cell,
Uid: uid,
},
Hostname: "host1",
PortMap: map[string]int32{
"vt": int32(uid),
},
Keyspace: k,
Shard: s,
}
tInfo := &topo.TabletInfo{Tablet: tablet}
tablets[i] = tInfo
require.NoError(t, ts.CreateTablet(ctx, tablet))
uid++
}
tInfo := &topo.TabletInfo{Tablet: tablet}
tablets[i] = tInfo
require.NoError(t, ts.CreateTablet(ctx, tablet))
}

// Verify that we return a complete list of tablets and that each
// tablet matches what we expect.
out, err := ts.GetTabletsByCell(ctx, cell, tt.opt)
require.NoError(t, err)
require.Len(t, out, tt.tablets)
require.Len(t, out, tt.createShardTablets)

for i, tab := range tablets {
require.Equal(t, tab.Tablet, tablets[i].Tablet)
}

for _, tablet := range out {
require.Equal(t, keyspace, tablet.Keyspace)
require.Equal(t, shard, tablet.Shard)
}
})
}
}
Expand Down
7 changes: 3 additions & 4 deletions go/vt/vtorc/logic/tablet_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/topotools"
"vitess.io/vitess/go/vt/vtctl/reparentutil"
"vitess.io/vitess/go/vt/vtorc/config"
"vitess.io/vitess/go/vt/vtorc/db"
Expand Down Expand Up @@ -203,7 +202,7 @@ func refreshTabletsUsing(loader func(tabletAlias string), forceRefresh bool) {
}

func refreshTabletsInCell(ctx context.Context, cell string, loader func(tabletAlias string), forceRefresh bool) {
tablets, err := topotools.GetTabletMapForCell(ctx, ts, cell)
tablets, err := ts.GetTabletsByCell(ctx, cell, &topo.GetTabletsByCellOptions{Concurrency: topo.DefaultConcurrency})
if err != nil {
log.Errorf("Error fetching topo info for cell %v: %v", cell, err)
return
Expand Down Expand Up @@ -235,7 +234,7 @@ func refreshTabletInfoOfShard(ctx context.Context, keyspace, shard string) {
}

func refreshTabletsInKeyspaceShard(ctx context.Context, keyspace, shard string, loader func(tabletAlias string), forceRefresh bool, tabletsToIgnore []string) {
tablets, err := ts.GetTabletMapForShard(ctx, keyspace, shard)
tablets, err := ts.GetTabletsByShard(ctx, keyspace, shard)
if err != nil {
log.Errorf("Error fetching tablets for keyspace/shard %v/%v: %v", keyspace, shard, err)
return
Expand All @@ -245,7 +244,7 @@ func refreshTabletsInKeyspaceShard(ctx context.Context, keyspace, shard string,
refreshTablets(tablets, query, args, loader, forceRefresh, tabletsToIgnore)
}

func refreshTablets(tablets map[string]*topo.TabletInfo, query string, args []any, loader func(tabletAlias string), forceRefresh bool, tabletsToIgnore []string) {
func refreshTablets(tablets []*topo.TabletInfo, query string, args []any, loader func(tabletAlias string), forceRefresh bool, tabletsToIgnore []string) {
// Discover new tablets.
latestInstances := make(map[string]bool)
var wg sync.WaitGroup
Expand Down