Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 1 addition & 10 deletions go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,6 @@ var (
// refreshKnownTablets tells us whether to process all tablets or only new tablets.
refreshKnownTablets = true

// topoReadConcurrency tells us how many topo reads are allowed in parallel.
topoReadConcurrency int64 = 32

// How much to sleep between each check.
waitAvailableTabletInterval = 100 * time.Millisecond

Expand All @@ -107,11 +104,6 @@ const (
DefaultHealthCheckRetryDelay = 5 * time.Second
DefaultHealthCheckTimeout = 1 * time.Minute

// DefaultTopoReadConcurrency is used as the default value for the topoReadConcurrency parameter of a TopologyWatcher.
DefaultTopoReadConcurrency int = 5
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. This was unused 🤦

// DefaultTopologyWatcherRefreshInterval is used as the default value for
// the refresh interval of a topology watcher.
DefaultTopologyWatcherRefreshInterval = 1 * time.Minute
// healthCheckTemplate is the HTML code to display a TabletsCacheStatusList, it takes a parameter for the title
// as the template can be used for both HealthCheck's cache and healthy tablets list.
healthCheckTemplate = `
Expand Down Expand Up @@ -176,7 +168,6 @@ func registerWebUIFlags(fs *pflag.FlagSet) {
fs.StringVar(&TabletURLTemplateString, "tablet_url_template", "http://{{.GetTabletHostPort}}", "Format string describing debug tablet url formatting. See getTabletDebugURL() for how to customize this.")
fs.DurationVar(&refreshInterval, "tablet_refresh_interval", 1*time.Minute, "Tablet refresh interval.")
fs.BoolVar(&refreshKnownTablets, "tablet_refresh_known_tablets", true, "Whether to reload the tablet's address/port map from topo in case they change.")
fs.Int64Var(&topoReadConcurrency, "topo_read_concurrency", 32, "Concurrency of topo reads.")
ParseTabletURLTemplateFromFlag()
}

Expand Down Expand Up @@ -362,7 +353,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
} else if len(KeyspacesToWatch) > 0 {
filter = NewFilterByKeyspace(KeyspacesToWatch)
}
topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filter, c, refreshInterval, refreshKnownTablets, topoReadConcurrency))
topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filter, c, refreshInterval, refreshKnownTablets, topo.DefaultConcurrency))
}

hc.topoWatchers = topoWatchers
Expand Down
4 changes: 2 additions & 2 deletions go/vt/discovery/topology_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type TopologyWatcher struct {
cell string
refreshInterval time.Duration
refreshKnownTablets bool
concurrency int64
concurrency int
ctx context.Context
cancelFunc context.CancelFunc
// wg keeps track of all launched Go routines.
Expand All @@ -92,7 +92,7 @@ type TopologyWatcher struct {

// NewTopologyWatcher returns a TopologyWatcher that monitors all
// the tablets in a cell, and reloads them as needed.
func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int64) *TopologyWatcher {
func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int) *TopologyWatcher {
tw := &TopologyWatcher{
topoServer: topoServer,
healthcheck: hc,
Expand Down
14 changes: 5 additions & 9 deletions go/vt/schemamanager/tablet_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,24 +107,20 @@ func (exec *TabletExecutor) Open(ctx context.Context, keyspace string) error {
return nil
}
exec.keyspace = keyspace
shardNames, err := exec.ts.GetShardNames(ctx, keyspace)
shards, err := exec.ts.FindAllShardsInKeyspace(ctx, keyspace, nil)
if err != nil {
return fmt.Errorf("unable to get shard names for keyspace: %s, error: %v", keyspace, err)
return fmt.Errorf("unable to get shards for keyspace: %s, error: %v", keyspace, err)
}
exec.tablets = make([]*topodatapb.Tablet, len(shardNames))
for i, shardName := range shardNames {
shardInfo, err := exec.ts.GetShard(ctx, keyspace, shardName)
if err != nil {
return fmt.Errorf("unable to get shard info, keyspace: %s, shard: %s, error: %v", keyspace, shardName, err)
}
exec.tablets = make([]*topodatapb.Tablet, 0, len(shards))
for shardName, shardInfo := range shards {
if !shardInfo.HasPrimary() {
return fmt.Errorf("shard: %s does not have a primary", shardName)
}
tabletInfo, err := exec.ts.GetTablet(ctx, shardInfo.PrimaryAlias)
if err != nil {
return fmt.Errorf("unable to get primary tablet info, keyspace: %s, shard: %s, error: %v", keyspace, shardName, err)
}
exec.tablets[i] = tabletInfo.Tablet
exec.tablets = append(exec.tablets, tabletInfo.Tablet)
}

if len(exec.tablets) == 0 {
Expand Down
92 changes: 79 additions & 13 deletions go/vt/topo/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@ package topo
import (
"context"
"path"
"sort"
"sync"

"github.com/spf13/pflag"
"golang.org/x/sync/errgroup"

"vitess.io/vitess/go/constants/sidecar"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/vterrors"

"vitess.io/vitess/go/event"
Expand All @@ -34,7 +38,20 @@ import (
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

// This file contains keyspace utility functions
// This file contains keyspace utility functions.

// Default concurrency to use in order to avoid overhwelming the topo server.
var DefaultConcurrency = 32

func registerFlags(fs *pflag.FlagSet) {
fs.IntVar(&DefaultConcurrency, "topo_read_concurrency", DefaultConcurrency, "Concurrency of topo reads.")
}

func init() {
servenv.OnParseFor("vtcombo", registerFlags)
servenv.OnParseFor("vtctld", registerFlags)
servenv.OnParseFor("vtgate", registerFlags)
}

// KeyspaceInfo is a meta struct that contains metadata to give the
// data more context and convenience. This is the main way we interact
Expand Down Expand Up @@ -188,12 +205,60 @@ func (ts *Server) FindAllShardsInKeyspace(ctx context.Context, keyspace string,
opt = &FindAllShardsInKeyspaceOptions{}
}
if opt.Concurrency <= 0 {
opt.Concurrency = 1
opt.Concurrency = DefaultConcurrency
}

// First try to get all shards using List if we can.
buildResultFromList := func(kvpairs []KVInfo) (map[string]*ShardInfo, error) {
result := make(map[string]*ShardInfo, len(kvpairs))
for _, entry := range kvpairs {
// The shard key looks like this: /vitess/global/keyspaces/commerce/shards/-80/Shard
shardKey := string(entry.Key)
shardName := path.Base(path.Dir(shardKey)) // The base part of the dir is "-80"
// Validate the extracted shard name.
if _, _, err := ValidateShardName(shardName); err != nil {
return nil, vterrors.Wrapf(err, "FindAllShardsInKeyspace(%s): unexpected shard key/path %q contains invalid shard name/range %q",
keyspace, shardKey, shardName)
}
shard := &topodatapb.Shard{}
if err := shard.UnmarshalVT(entry.Value); err != nil {
return nil, vterrors.Wrapf(err, "FindAllShardsInKeyspace(%s): invalid data found for shard %q in %q",
keyspace, shardName, shardKey)
}
result[shardName] = &ShardInfo{
keyspace: keyspace,
shardName: shardName,
version: entry.Version,
Shard: shard,
}
}
return result, nil
}
shardsPath := path.Join(KeyspacesPath, keyspace, ShardsPath)
listRes, err := ts.globalCell.List(ctx, shardsPath)
if err == nil { // We have everything we need to build the result
return buildResultFromList(listRes)
}
if IsErrType(err, NoNode) {
// The path doesn't exist, let's see if the keyspace exists.
if _, kerr := ts.GetKeyspace(ctx, keyspace); kerr != nil {
return nil, vterrors.Wrapf(err, "FindAllShardsInKeyspace(%s): List", keyspace)
}
// We simply have no shards.
return make(map[string]*ShardInfo, 0), nil
}
// Currently the ZooKeeper implementation does not support index prefix
// scans so we fall back to concurrently fetching the shards one by one.
// It is also possible that the response containing all shards is too
// large in which case we also fall back to the one by one fetch.
if !IsErrType(err, NoImplementation) && !IsErrType(err, ResourceExhausted) {
return nil, vterrors.Wrapf(err, "FindAllShardsInKeyspace(%s): List", keyspace)
}

// Fall back to the shard by shard method.
shards, err := ts.GetShardNames(ctx, keyspace)
if err != nil {
return nil, vterrors.Wrapf(err, "failed to get list of shards for keyspace '%v'", keyspace)
return nil, vterrors.Wrapf(err, "failed to get list of shard names for keyspace '%s'", keyspace)
}

// Keyspaces with a large number of shards and geographically distributed
Expand All @@ -213,7 +278,7 @@ func (ts *Server) FindAllShardsInKeyspace(ctx context.Context, keyspace string,
)

eg, ctx := errgroup.WithContext(ctx)
eg.SetLimit(opt.Concurrency)
eg.SetLimit(int(opt.Concurrency))

for _, shard := range shards {
shard := shard
Expand All @@ -222,7 +287,7 @@ func (ts *Server) FindAllShardsInKeyspace(ctx context.Context, keyspace string,
si, err := ts.GetShard(ctx, keyspace, shard)
switch {
case IsErrType(err, NoNode):
log.Warningf("GetShard(%v, %v) returned ErrNoNode, consider checking the topology.", keyspace, shard)
log.Warningf("GetShard(%s, %s) returned ErrNoNode, consider checking the topology.", keyspace, shard)
return nil
case err == nil:
mu.Lock()
Expand All @@ -231,7 +296,7 @@ func (ts *Server) FindAllShardsInKeyspace(ctx context.Context, keyspace string,

return nil
default:
return vterrors.Wrapf(err, "GetShard(%v, %v) failed", keyspace, shard)
return vterrors.Wrapf(err, "GetShard(%s, %s) failed", keyspace, shard)
}
})
}
Expand All @@ -245,25 +310,26 @@ func (ts *Server) FindAllShardsInKeyspace(ctx context.Context, keyspace string,

// GetServingShards returns all shards where the primary is serving.
func (ts *Server) GetServingShards(ctx context.Context, keyspace string) ([]*ShardInfo, error) {
shards, err := ts.GetShardNames(ctx, keyspace)
shards, err := ts.FindAllShardsInKeyspace(ctx, keyspace, nil)
if err != nil {
return nil, vterrors.Wrapf(err, "failed to get list of shards for keyspace '%v'", keyspace)
}

result := make([]*ShardInfo, 0, len(shards))
for _, shard := range shards {
si, err := ts.GetShard(ctx, keyspace, shard)
if err != nil {
return nil, vterrors.Wrapf(err, "GetShard(%v, %v) failed", keyspace, shard)
}
if !si.IsPrimaryServing {
if !shard.IsPrimaryServing {
continue
}
result = append(result, si)
result = append(result, shard)
}
if len(result) == 0 {
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "%v has no serving shards", keyspace)
}
// Sort the shards by KeyRange for deterministic results.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

sort.Slice(result, func(i, j int) bool {
return key.KeyRangeLess(result[i].KeyRange, result[j].KeyRange)
})

return result, nil
}

Expand Down
96 changes: 95 additions & 1 deletion go/vt/topo/keyspace_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@ package topo_test

import (
"context"
"fmt"
"slices"
"testing"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/vt/key"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/memorytopo"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

func TestServerFindAllShardsInKeyspace(t *testing.T) {
Expand Down Expand Up @@ -87,3 +90,94 @@ func TestServerFindAllShardsInKeyspace(t *testing.T) {
})
}
}

func TestServerGetServingShards(t *testing.T) {
keyspace := "ks1"
errNoListImpl := topo.NewError(topo.NoImplementation, "don't be doing no listing round here")

tests := []struct {
shards int // Number of shards to create
err string // Error message we expect, if any
fallback bool // Should we fallback to the shard by shard method
}{
{
shards: 0,
err: fmt.Sprintf("%s has no serving shards", keyspace),
},
{
shards: 2,
},
{
shards: 128,
},
{
shards: 512,
fallback: true,
},
{
shards: 1024,
},
}

for _, tt := range tests {
t.Run(fmt.Sprintf("%d shards with fallback = %t", tt.shards, tt.fallback), func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ts, factory := memorytopo.NewServerAndFactory(ctx)
defer ts.Close()
stats := factory.GetCallStats()
require.NotNil(t, stats)

if tt.fallback {
factory.SetListError(errNoListImpl)
}

err := ts.CreateKeyspace(ctx, keyspace, &topodatapb.Keyspace{})
require.NoError(t, err)
var shardNames []string
if tt.shards > 0 {
shardNames, err = key.GenerateShardRanges(tt.shards)
require.NoError(t, err)
for _, shardName := range shardNames {
err = ts.CreateShard(ctx, keyspace, shardName)
require.NoError(t, err)
}
}

// Verify that we return a complete list of shards and that each
// key range is present in the output.
stats.ResetAll() // We only want the stats for GetServingShards
shardInfos, err := ts.GetServingShards(ctx, keyspace)
if tt.err != "" {
require.EqualError(t, err, tt.err)
return
}
require.NoError(t, err)
require.Len(t, shardInfos, tt.shards)
for _, shardName := range shardNames {
f := func(si *topo.ShardInfo) bool {
return key.KeyRangeString(si.Shard.KeyRange) == shardName
}
require.True(t, slices.ContainsFunc(shardInfos, f), "shard %q was not found in the results",
shardName)
}

// Now we check the stats based on the number of shards and whether or not
// we should have had a List error and fell back to the shard by shard method.
callcounts := stats.Counts()
require.NotNil(t, callcounts)
require.Equal(t, int64(1), callcounts["List"]) // We should always try
switch {
case tt.fallback: // We get the shards one by one from the list
require.Equal(t, int64(1), callcounts["ListDir"]) // GetShardNames
require.Equal(t, int64(tt.shards), callcounts["Get"]) // GetShard
case tt.shards < 1: // We use a Get to check that the keyspace exists
require.Equal(t, int64(0), callcounts["ListDir"])
require.Equal(t, int64(1), callcounts["Get"])
default: // We should not make any ListDir or Get calls
require.Equal(t, int64(0), callcounts["ListDir"])
require.Equal(t, int64(0), callcounts["Get"])
}
})
}
}
2 changes: 2 additions & 0 deletions go/vt/topo/memorytopo/directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (

// ListDir is part of the topo.Conn interface.
func (c *Conn) ListDir(ctx context.Context, dirPath string, full bool) ([]topo.DirEntry, error) {
c.factory.callstats.Add([]string{"ListDir"}, 1)

if err := c.dial(ctx); err != nil {
return nil, err
}
Expand Down
4 changes: 3 additions & 1 deletion go/vt/topo/memorytopo/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ import (
"vitess.io/vitess/go/vt/topo"
)

// NewLeaderParticipation is part of the topo.Server interface
// NewLeaderParticipation is part of the topo.Conn interface.
func (c *Conn) NewLeaderParticipation(name, id string) (topo.LeaderParticipation, error) {
c.factory.callstats.Add([]string{"NewLeaderParticipation"}, 1)

if c.closed {
return nil, ErrConnectionClosed
}
Expand Down
Loading