Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 0 additions & 12 deletions go/vt/vtorc/inst/shard_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,6 @@ func ReadShardNames(keyspaceName string) (shardNames []string, err error) {
return shardNames, err
}

// ReadAllShardNames reads the names of all vitess shards by keyspace.
func ReadAllShardNames() (shardNames map[string][]string, err error) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replaced by new ReadTabletCountsByKeyspaceShard

shardNames = make(map[string][]string)
query := `select keyspace, shard from vitess_shard`
err = db.QueryVTOrc(query, nil, func(row sqlutils.RowMap) error {
ks := row.GetString("keyspace")
shardNames[ks] = append(shardNames[ks], row.GetString("shard"))
return nil
})
return shardNames, err
}

// ReadShardPrimaryInformation reads the vitess shard record and gets the shard primary alias and timestamp.
func ReadShardPrimaryInformation(keyspaceName, shardName string) (primaryAlias string, primaryTimestamp string, err error) {
if err = topo.ValidateKeyspaceName(keyspaceName); err != nil {
Expand Down
7 changes: 0 additions & 7 deletions go/vt/vtorc/inst/shard_dao_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,6 @@ func TestSaveReadAndDeleteShard(t *testing.T) {
require.NoError(t, err)
require.Equal(t, []string{tt.shardName}, shardNames)

// ReadAllShardNames
allShardNames, err := ReadAllShardNames()
require.NoError(t, err)
ksShards, found := allShardNames[tt.keyspaceName]
require.True(t, found)
require.Equal(t, []string{tt.shardName}, ksShards)

// DeleteShard
require.NoError(t, DeleteShard(tt.keyspaceName, tt.shardName))
_, _, err = ReadShardPrimaryInformation(tt.keyspaceName, tt.shardName)
Expand Down
25 changes: 25 additions & 0 deletions go/vt/vtorc/inst/tablet_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,31 @@ func ReadTabletCountsByCell() (map[string]int64, error) {
return tabletCounts, err
}

// ReadTabletCountsByKeyspaceShard returns the count of tablets watched by keyspace/shard.
// The backend query uses an index by "keyspace, shard": ks_idx_vitess_tablet.
func ReadTabletCountsByKeyspaceShard() (map[string]map[string]int64, error) {
tabletCounts := make(map[string]map[string]int64)
query := `SELECT
keyspace,
shard,
COUNT() AS count
FROM
vitess_tablet
GROUP BY
keyspace,
shard`
err := db.QueryVTOrc(query, nil, func(row sqlutils.RowMap) error {
keyspace := row.GetString("keyspace")
shard := row.GetString("shard")
if _, found := tabletCounts[keyspace]; !found {
tabletCounts[keyspace] = make(map[string]int64)
}
tabletCounts[keyspace][shard] = row.GetInt64("count")
return nil
})
return tabletCounts, err
}

// SaveTablet saves the tablet record against the instanceKey.
func SaveTablet(tablet *topodatapb.Tablet) error {
tabletp, err := prototext.Marshal(tablet)
Expand Down
32 changes: 32 additions & 0 deletions go/vt/vtorc/inst/tablet_dao_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,35 @@ func TestReadTabletCountsByCell(t *testing.T) {
require.NoError(t, err)
require.Equal(t, map[string]int64{"cell1": 100}, tabletCounts)
}

func TestReadTabletCountsByKeyspaceShard(t *testing.T) {
// Clear the database after the test. The easiest way to do that is to run all the initialization commands again.
defer func() {
db.ClearVTOrcDatabase()
}()

var uid uint32
for _, shard := range []string{"-40", "40-80", "80-c0", "c0-"} {
for i := 0; i < 100; i++ {
require.NoError(t, SaveTablet(&topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "cell1",
Uid: uid,
},
Keyspace: "test",
Shard: shard,
}))
uid++
}
}
tabletCounts, err := ReadTabletCountsByKeyspaceShard()
require.NoError(t, err)
require.Equal(t, map[string]map[string]int64{
"test": {
"-40": 100,
"40-80": 100,
"80-c0": 100,
"c0-": 100,
},
}, tabletCounts)
}
22 changes: 0 additions & 22 deletions go/vt/vtorc/logic/keyspace_shard_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,35 +23,13 @@ import (
"golang.org/x/exp/maps"
"golang.org/x/sync/errgroup"

"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vtorc/inst"
)

var statsShardsWatched = stats.NewGaugesFuncWithMultiLabels("ShardsWatched",
"Keyspace/shards currently watched",
[]string{"Keyspace", "Shard"},
getShardsWatchedStats)

// getShardsWatchedStats returns the keyspace/shards watched in a format for stats.
func getShardsWatchedStats() map[string]int64 {
shardsWatched := make(map[string]int64)
allShardNames, err := inst.ReadAllShardNames()
if err != nil {
log.Errorf("Failed to read all shard names: %+v", err)
return shardsWatched
}
for ks, shards := range allShardNames {
for _, shard := range shards {
shardsWatched[ks+"."+shard] = 1
}
}
return shardsWatched
}

// refreshAllKeyspacesAndShardsMu ensures RefreshAllKeyspacesAndShards
// is not executed concurrently.
var refreshAllKeyspacesAndShardsMu sync.Mutex
Expand Down
42 changes: 30 additions & 12 deletions go/vt/vtorc/logic/tablet_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,28 +56,46 @@ var (
// This is populated by parsing `--clusters_to_watch` flag.
shardsToWatch map[string][]*topodatapb.KeyRange

statsTabletsWatched = stats.NewGaugesFuncWithMultiLabels(
"TabletsWatched",
// tablet stats
statsTabletsWatchedByCell = stats.NewGaugesFuncWithMultiLabels(
"TabletsWatchedByCell",
"Number of tablets watched by cell",
[]string{"cell"},
getTabletsWatchedByCell,
[]string{"Cell"},
getTabletsWatchedByCellStats,
)
statsTabletsWatchedByShard = stats.NewGaugesFuncWithMultiLabels(
"TabletsWatchedByShard",
"Number of tablets watched by keyspace/shard",
[]string{"Keyspace", "Shard"},
getTabletsWatchedByShardStats,
)

// ErrNoPrimaryTablet is a fixed error message.
ErrNoPrimaryTablet = errors.New("no primary tablet found")
)

func getTabletsWatchedByCell() map[string]int64 {
tabletsWatchedByCell := make(map[string]int64)
tabletsByCell, err := inst.ReadTabletCountsByCell()
if err == nil {
// getTabletsWatchedByCellStats returns the number of tablets watched by cell in stats format.
func getTabletsWatchedByCellStats() map[string]int64 {
tabletCountsByCell, err := inst.ReadTabletCountsByCell()
if err != nil {
log.Errorf("Failed to read tablet counts by cell: %+v", err)
return tabletsWatchedByCell
}
for cell, tabletCount := range tabletsByCell {
tabletsWatchedByCell[cell] = tabletCount
return tabletCountsByCell
}

// getTabletsWatchedByShardStats returns the number of tablets watched by keyspace/shard in stats format.
func getTabletsWatchedByShardStats() map[string]int64 {
tabletsWatchedByShard := make(map[string]int64)
tabletCountsByKS, err := inst.ReadTabletCountsByKeyspaceShard()
if err != nil {
log.Errorf("Failed to read tablet counts by shard: %+v", err)
}
for keyspace, countsByShard := range tabletCountsByKS {
for shard, tabletCount := range countsByShard {
tabletsWatchedByShard[keyspace+"."+shard] = tabletCount
}
}
return tabletsWatchedByCell
return tabletsWatchedByShard
}

// RegisterFlags registers the flags required by VTOrc
Expand Down
Loading