Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions go/vt/vtorc/inst/shard_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,30 @@ import (
// ErrShardNotFound is a fixed error message used when a shard is not found in the database.
var ErrShardNotFound = errors.New("shard not found")

// ReadShardNames reads the names of vitess shards for a single keyspace.
func ReadShardNames(keyspaceName string) (shardNames []string, err error) {
shardNames = make([]string, 0)
query := `select shard from vitess_shard where keyspace = ?`
args := sqlutils.Args(keyspaceName)
err = db.QueryVTOrc(query, args, func(row sqlutils.RowMap) error {
shardNames = append(shardNames, row.GetString("shard"))
return nil
})
return shardNames, err
}

// ReadAllShardNames reads the names of all vitess shards by keyspace.
func ReadAllShardNames() (shardNames map[string][]string, err error) {
shardNames = make(map[string][]string)
query := `select keyspace, shard from vitess_shard`
err = db.QueryVTOrc(query, nil, func(row sqlutils.RowMap) error {
ks := row.GetString("keyspace")
shardNames[ks] = append(shardNames[ks], row.GetString("shard"))
return nil
})
return shardNames, err
}

// ReadShardPrimaryInformation reads the vitess shard record and gets the shard primary alias and timestamp.
func ReadShardPrimaryInformation(keyspaceName, shardName string) (primaryAlias string, primaryTimestamp string, err error) {
if err = topo.ValidateKeyspaceName(keyspaceName); err != nil {
Expand Down Expand Up @@ -95,3 +119,16 @@ func getShardPrimaryTermStartTimeString(shard *topo.ShardInfo) string {
}
return protoutil.TimeFromProto(shard.PrimaryTermStartTime).UTC().String()
}

// DeleteShard deletes a shard using a keyspace and shard name.
func DeleteShard(keyspace, shard string) error {
_, err := db.ExecVTOrc(`DELETE FROM
vitess_shard
WHERE
keyspace = ?
AND shard = ?`,
keyspace,
shard,
)
return err
}
13 changes: 12 additions & 1 deletion go/vt/vtorc/inst/shard_dao_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"vitess.io/vitess/go/vt/vtorc/db"
)

func TestSaveAndReadShard(t *testing.T) {
func TestSaveReadAndDeleteShard(t *testing.T) {
// Clear the database after the test. The easiest way to do that is to run all the initialization commands again.
defer func() {
db.ClearVTOrcDatabase()
Expand Down Expand Up @@ -94,6 +94,7 @@ func TestSaveAndReadShard(t *testing.T) {
require.NoError(t, err)
}

// ReadShardPrimaryInformation
shardPrimaryAlias, primaryTimestamp, err := ReadShardPrimaryInformation(tt.keyspaceName, tt.shardName)
if tt.err != "" {
require.EqualError(t, err, tt.err)
Expand All @@ -102,6 +103,16 @@ func TestSaveAndReadShard(t *testing.T) {
require.NoError(t, err)
require.EqualValues(t, tt.primaryAliasWanted, shardPrimaryAlias)
require.EqualValues(t, tt.primaryTimestampWanted, primaryTimestamp)

// ReadShardNames
shardNames, err := ReadShardNames(tt.keyspaceName)
require.NoError(t, err)
require.Equal(t, []string{tt.shardName}, shardNames)

// DeleteShard
require.NoError(t, DeleteShard(tt.keyspaceName, tt.shardName))
_, _, err = ReadShardPrimaryInformation(tt.keyspaceName, tt.shardName)
require.EqualError(t, err, ErrShardNotFound.Error())
})
}
}
54 changes: 53 additions & 1 deletion go/vt/vtorc/logic/keyspace_shard_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,43 @@ import (

"golang.org/x/exp/maps"

"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/log"

"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vtorc/inst"
)

var statsShardsWatched = stats.NewGaugesFuncWithMultiLabels("ShardsWatched",
"Keyspace/shards currently watched",
[]string{"Keyspace", "Shard"},
getShardsWatchedStats)

// getShardsWatchedStats returns the keyspace/shards watched in a format for stats.
func getShardsWatchedStats() map[string]int64 {
shardsWatched := make(map[string]int64)
allShardNames, err := inst.ReadAllShardNames()
if err != nil {
log.Errorf("Failed to read all shard names: %+v", err)
return shardsWatched
}
for ks, shards := range allShardNames {
for _, shard := range shards {
shardsWatched[ks+"."+shard] = 1
}
}
return shardsWatched
}

// refreshAllKeyspacesAndShardsMu ensures RefreshAllKeyspacesAndShards
// is not executed concurrently.
var refreshAllKeyspacesAndShardsMu sync.Mutex

// RefreshAllKeyspacesAndShards reloads the keyspace and shard information for the keyspaces that vtorc is concerned with.
func RefreshAllKeyspacesAndShards(ctx context.Context) error {
refreshAllKeyspacesAndShardsMu.Lock()
defer refreshAllKeyspacesAndShardsMu.Unlock()

var keyspaces []string
if len(shardsToWatch) == 0 { // all known keyspaces
ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
Expand Down Expand Up @@ -109,6 +138,7 @@ func refreshKeyspaceHelper(ctx context.Context, keyspaceName string) error {

// refreshAllShards refreshes all the shard records in the given keyspace.
func refreshAllShards(ctx context.Context, keyspaceName string) error {
// get all shards for keyspace name.
shardInfos, err := ts.FindAllShardsInKeyspace(ctx, keyspaceName, &topo.FindAllShardsInKeyspaceOptions{
// Fetch shard records concurrently to speed up discovery. A typical
// Vitess cluster will have 1-3 vtorc instances deployed, so there is
Expand All @@ -119,13 +149,35 @@ func refreshAllShards(ctx context.Context, keyspaceName string) error {
log.Error(err)
return err
}
savedShards := make(map[string]bool, len(shardInfos))
for _, shardInfo := range shardInfos {
err = inst.SaveShard(shardInfo)
if err != nil {
log.Error(err)
return err
}
savedShards[shardInfo.ShardName()] = true
}

// delete shards that were not returned by ts.FindAllShardsInKeyspace(...),
// indicating they are stale.
shards, err := inst.ReadShardNames(keyspaceName)
if err != nil {
return err
}
for _, shard := range shards {
if savedShards[shard] {
continue
}
shardName := topoproto.KeyspaceShardString(keyspaceName, shard)
log.Infof("Forgetting shard: %s", shardName)
err = inst.DeleteShard(keyspaceName, shard)
if err != nil {
log.Errorf("Failed to delete shard %s: %+v", shardName, err)
return err
}
}

return nil
}

Expand Down
Loading