diff --git a/changelog/22.0/22.0.0/summary.md b/changelog/22.0/22.0.0/summary.md
index b2c5c029851..1e31214cd84 100644
--- a/changelog/22.0/22.0.0/summary.md
+++ b/changelog/22.0/22.0.0/summary.md
@@ -15,6 +15,7 @@
- **[Stalled Disk Recovery in VTOrc](#stall-disk-recovery)**
- **[Update default MySQL version to 8.0.40](#mysql-8-0-40)**
- **[Update lite images to Debian Bookworm](#debian-bookworm)**
+ - **[KeyRanges in `--clusters_to_watch` in VTOrc](#key-range-vtorc)**
- **[Support for Filtering Query logs on Error](#query-logs)**
- **[Minor Changes](#minor-changes)**
- **[VTTablet Flags](#flags-vttablet)**
@@ -135,6 +136,11 @@ This is the last time this will be needed in the `8.0.x` series, as starting wit
The base system now uses Debian Bookworm instead of Debian Bullseye for the `vitess/lite` images. This change was brought by [Pull Request #17552].
+### KeyRanges in `--clusters_to_watch` in VTOrc
+VTOrc now supports specifying keyranges in the `--clusters_to_watch` flag. This means that there is no need to restart a VTOrc instance with a different flag value when you reshard a keyspace.
+For example, if a VTOrc is configured to watch `ks/-80`, then it would watch all the shards that fall under the keyrange `-80`. If a reshard is performed and `-80` is split into new shards `-40` and `40-80`, the VTOrc instance will automatically start watching the new shards without needing a restart. In the previous logic, specifying `ks/-80` for the flag would mean that VTOrc would watch only 1 (or no) shard. In the new system, since we interpret `-80` as a key range, it can watch multiple shards as described in the example.
+Users can continue to specify exact keyranges. The new feature is backward compatible.
+
### Support for Filtering Query logs on Error
The `querylog-mode` setting can be configured to `error` to log only queries that result in errors. This option is supported in both VTGate and VTTablet.
diff --git a/go/flags/endtoend/vtorc.txt b/go/flags/endtoend/vtorc.txt
index ca8083709e5..57eb907cf4d 100644
--- a/go/flags/endtoend/vtorc.txt
+++ b/go/flags/endtoend/vtorc.txt
@@ -24,7 +24,7 @@ Flags:
--bind-address string Bind address for the server. If empty, the server will listen on all available unicast and anycast IP addresses of the local system.
--catch-sigpipe catch and ignore SIGPIPE on stdout and stderr if specified
--change-tablets-with-errant-gtid-to-drained Whether VTOrc should be changing the type of tablets with errant GTIDs to DRAINED
- --clusters_to_watch strings Comma-separated list of keyspaces or keyspace/shards that this instance will monitor and repair. Defaults to all clusters in the topology. Example: "ks1,ks2/-80"
+ --clusters_to_watch strings Comma-separated list of keyspaces or keyspace/keyranges that this instance will monitor and repair. Defaults to all clusters in the topology. Example: "ks1,ks2/-80"
--config-file string Full path of the config file (with extension) to use. If set, --config-path, --config-type, and --config-name are ignored.
--config-file-not-found-handling ConfigFileNotFoundHandling Behavior when a config file is not found. (Options: error, exit, ignore, warn) (default warn)
--config-name string Name of the config file (without extension) to search for. (default "vtconfig")
diff --git a/go/vt/key/key.go b/go/vt/key/key.go
index 89d956bd433..82852daa16e 100644
--- a/go/vt/key/key.go
+++ b/go/vt/key/key.go
@@ -95,6 +95,14 @@ func NewKeyRange(start []byte, end []byte) *topodatapb.KeyRange {
return &topodatapb.KeyRange{Start: start, End: end}
}
+// NewCompleteKeyRange returns a complete key range.
+func NewCompleteKeyRange() *topodatapb.KeyRange {
+ return &topodatapb.KeyRange{
+ Start: nil,
+ End: nil,
+ }
+}
+
// KeyRangeAdd adds two adjacent KeyRange values (in any order) into a single value. If the values are not adjacent,
// it returns false.
func KeyRangeAdd(a, b *topodatapb.KeyRange) (*topodatapb.KeyRange, bool) {
diff --git a/go/vt/topo/shard_test.go b/go/vt/topo/shard_test.go
index 6bd4aae5b62..915bcd18e3c 100644
--- a/go/vt/topo/shard_test.go
+++ b/go/vt/topo/shard_test.go
@@ -323,6 +323,14 @@ func TestValidateShardName(t *testing.T) {
},
valid: true,
},
+ {
+ name: "-",
+ expectedRange: &topodatapb.KeyRange{
+ Start: []byte{},
+ End: []byte{},
+ },
+ valid: true,
+ },
{
name: "40-80",
expectedRange: &topodatapb.KeyRange{
diff --git a/go/vt/vtorc/logic/keyspace_shard_discovery.go b/go/vt/vtorc/logic/keyspace_shard_discovery.go
index 0dd17cb65fd..8115e614418 100644
--- a/go/vt/vtorc/logic/keyspace_shard_discovery.go
+++ b/go/vt/vtorc/logic/keyspace_shard_discovery.go
@@ -18,10 +18,10 @@ package logic
import (
"context"
- "sort"
- "strings"
"sync"
+ "golang.org/x/exp/maps"
+
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo"
@@ -31,7 +31,7 @@ import (
// RefreshAllKeyspacesAndShards reloads the keyspace and shard information for the keyspaces that vtorc is concerned with.
func RefreshAllKeyspacesAndShards(ctx context.Context) error {
var keyspaces []string
- if len(clustersToWatch) == 0 { // all known keyspaces
+ if len(shardsToWatch) == 0 { // all known keyspaces
ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
var err error
@@ -41,26 +41,10 @@ func RefreshAllKeyspacesAndShards(ctx context.Context) error {
return err
}
} else {
- // Parse input and build list of keyspaces
- for _, ks := range clustersToWatch {
- if strings.Contains(ks, "/") {
- // This is a keyspace/shard specification
- input := strings.Split(ks, "/")
- keyspaces = append(keyspaces, input[0])
- } else {
- // Assume this is a keyspace
- keyspaces = append(keyspaces, ks)
- }
- }
- if len(keyspaces) == 0 {
- log.Errorf("Found no keyspaces for input: %+v", clustersToWatch)
- return nil
- }
+ // Get keyspaces to watch from the list of known keyspaces.
+ keyspaces = maps.Keys(shardsToWatch)
}
- // Sort the list of keyspaces.
- // The list can have duplicates because the input to clusters to watch may have multiple shards of the same keyspace
- sort.Strings(keyspaces)
refreshCtx, refreshCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer refreshCancel()
var wg sync.WaitGroup
diff --git a/go/vt/vtorc/logic/keyspace_shard_discovery_test.go b/go/vt/vtorc/logic/keyspace_shard_discovery_test.go
index 8218af45db6..f05295416d0 100644
--- a/go/vt/vtorc/logic/keyspace_shard_discovery_test.go
+++ b/go/vt/vtorc/logic/keyspace_shard_discovery_test.go
@@ -93,6 +93,8 @@ func TestRefreshAllKeyspaces(t *testing.T) {
// Set clusters to watch to only watch ks1 and ks3
onlyKs1and3 := []string{"ks1/-80", "ks3/-80", "ks3/80-"}
clustersToWatch = onlyKs1and3
+ err := initializeShardsToWatch()
+ require.NoError(t, err)
require.NoError(t, RefreshAllKeyspacesAndShards(context.Background()))
// Verify that we only have ks1 and ks3 in vtorc's db.
@@ -106,6 +108,8 @@ func TestRefreshAllKeyspaces(t *testing.T) {
// Set clusters to watch to watch all keyspaces
clustersToWatch = nil
+ err = initializeShardsToWatch()
+ require.NoError(t, err)
// Change the durability policy of ks1
reparenttestutil.SetKeyspaceDurability(ctx, t, ts, "ks1", policy.DurabilitySemiSync)
require.NoError(t, RefreshAllKeyspacesAndShards(context.Background()))
@@ -119,7 +123,6 @@ func TestRefreshAllKeyspaces(t *testing.T) {
verifyPrimaryAlias(t, "ks3", "80-", "zone_ks3-0000000101", "")
verifyKeyspaceInfo(t, "ks4", keyspaceDurabilityTest, "")
verifyPrimaryAlias(t, "ks4", "80-", "zone_ks4-0000000101", "")
-
}
func TestRefreshKeyspace(t *testing.T) {
diff --git a/go/vt/vtorc/logic/tablet_discovery.go b/go/vt/vtorc/logic/tablet_discovery.go
index eb10bb2a667..c5c23df0cd0 100644
--- a/go/vt/vtorc/logic/tablet_discovery.go
+++ b/go/vt/vtorc/logic/tablet_discovery.go
@@ -32,6 +32,7 @@ import (
"google.golang.org/protobuf/proto"
"vitess.io/vitess/go/vt/external/golib/sqlutils"
+ "vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/log"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/topo"
@@ -48,8 +49,10 @@ var (
clustersToWatch []string
shutdownWaitTime = 30 * time.Second
shardsLockCounter int32
- shardsToWatch map[string][]string
- shardsToWatchMu sync.Mutex
+ // shardsToWatch is a map storing the shards for a given keyspace that need to be watched.
+ // We store the key range for all the shards that we want to watch.
+ // This is populated by parsing `--clusters_to_watch` flag.
+ shardsToWatch map[string][]*topodatapb.KeyRange
// ErrNoPrimaryTablet is a fixed error message.
ErrNoPrimaryTablet = errors.New("no primary tablet found")
@@ -57,18 +60,18 @@ var (
// RegisterFlags registers the flags required by VTOrc
func RegisterFlags(fs *pflag.FlagSet) {
- fs.StringSliceVar(&clustersToWatch, "clusters_to_watch", clustersToWatch, "Comma-separated list of keyspaces or keyspace/shards that this instance will monitor and repair. Defaults to all clusters in the topology. Example: \"ks1,ks2/-80\"")
+ fs.StringSliceVar(&clustersToWatch, "clusters_to_watch", clustersToWatch, "Comma-separated list of keyspaces or keyspace/keyranges that this instance will monitor and repair. Defaults to all clusters in the topology. Example: \"ks1,ks2/-80\"")
fs.DurationVar(&shutdownWaitTime, "shutdown_wait_time", shutdownWaitTime, "Maximum time to wait for VTOrc to release all the locks that it is holding before shutting down on SIGTERM")
}
-// updateShardsToWatch parses the --clusters_to_watch flag-value
+// initializeShardsToWatch parses the --clusters_to_watch flag-value
// into a map of keyspace/shards.
-func updateShardsToWatch() {
+func initializeShardsToWatch() error {
+ shardsToWatch = make(map[string][]*topodatapb.KeyRange)
if len(clustersToWatch) == 0 {
- return
+ return nil
}
- newShardsToWatch := make(map[string][]string, 0)
for _, ks := range clustersToWatch {
if strings.Contains(ks, "/") && !strings.HasSuffix(ks, "/") {
// Validate keyspace/shard parses.
@@ -77,34 +80,50 @@ func updateShardsToWatch() {
log.Errorf("Could not parse keyspace/shard %q: %+v", ks, err)
continue
}
- newShardsToWatch[k] = append(newShardsToWatch[k], s)
+ if !key.IsValidKeyRange(s) {
+ return fmt.Errorf("invalid key range %q while parsing clusters to watch", s)
+ }
+ // Parse the shard name into key range value.
+ keyRanges, err := key.ParseShardingSpec(s)
+ if err != nil {
+ return fmt.Errorf("could not parse shard name %q: %+v", s, err)
+ }
+ shardsToWatch[k] = append(shardsToWatch[k], keyRanges...)
} else {
- ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
- defer cancel()
- // Assume this is a keyspace and find all shards in keyspace.
// Remove trailing slash if exists.
ks = strings.TrimSuffix(ks, "/")
- shards, err := ts.GetShardNames(ctx, ks)
- if err != nil {
- // Log the err and continue.
- log.Errorf("Error fetching shards for keyspace: %v", ks)
- continue
- }
- if len(shards) == 0 {
- log.Errorf("Topo has no shards for ks: %v", ks)
- continue
- }
- newShardsToWatch[ks] = shards
+ // We store the entire range of key range if nothing is specified.
+ shardsToWatch[ks] = []*topodatapb.KeyRange{key.NewCompleteKeyRange()}
}
}
- if len(newShardsToWatch) == 0 {
- log.Error("No keyspace/shards to watch")
- return
+
+ if len(shardsToWatch) == 0 {
+ log.Error("No keyspace/shards to watch, watching all keyspaces")
}
+ return nil
+}
- shardsToWatchMu.Lock()
- defer shardsToWatchMu.Unlock()
- shardsToWatch = newShardsToWatch
+// shouldWatchTablet checks if the given tablet is part of the watch list.
+func shouldWatchTablet(tablet *topodatapb.Tablet) bool {
+ // If we are watching all keyspaces, then we want to watch this tablet too.
+ if len(shardsToWatch) == 0 {
+ return true
+ }
+ shardRanges, ok := shardsToWatch[tablet.GetKeyspace()]
+ // If we don't have the keyspace in our map, then this tablet
+ // doesn't need to be watched.
+ if !ok {
+ return false
+ }
+ // Get the tablet's key range, and check if
+ // it is part of the shard ranges we are watching.
+ kr := tablet.GetKeyRange()
+ for _, shardRange := range shardRanges {
+ if key.KeyRangeContainsKeyRange(shardRange, kr) {
+ return true
+ }
+ }
+ return false
}
// OpenTabletDiscovery opens the vitess topo if enables and returns a ticker
@@ -117,7 +136,10 @@ func OpenTabletDiscovery() <-chan time.Time {
log.Error(err)
}
// Parse --clusters_to_watch into a filter.
- updateShardsToWatch()
+ err := initializeShardsToWatch()
+ if err != nil {
+ log.Fatalf("Error parsing --clusters-to-watch: %v", err)
+ }
// We refresh all information from the topo once before we start the ticks to do
// it on a timer.
ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
@@ -179,16 +201,10 @@ func refreshTabletsUsing(ctx context.Context, loader func(tabletAlias string), f
// Filter tablets that should not be watched using shardsToWatch map.
matchedTablets := make([]*topo.TabletInfo, 0, len(tablets))
func() {
- shardsToWatchMu.Lock()
- defer shardsToWatchMu.Unlock()
for _, t := range tablets {
- if len(shardsToWatch) > 0 {
- _, ok := shardsToWatch[t.Tablet.Keyspace]
- if !ok || !slices.Contains(shardsToWatch[t.Tablet.Keyspace], t.Tablet.Shard) {
- continue // filter
- }
+ if shouldWatchTablet(t.Tablet) {
+ matchedTablets = append(matchedTablets, t)
}
- matchedTablets = append(matchedTablets, t)
}
}()
diff --git a/go/vt/vtorc/logic/tablet_discovery_test.go b/go/vt/vtorc/logic/tablet_discovery_test.go
index 54284e8a017..4514ef81724 100644
--- a/go/vt/vtorc/logic/tablet_discovery_test.go
+++ b/go/vt/vtorc/logic/tablet_discovery_test.go
@@ -30,6 +30,7 @@ import (
"google.golang.org/protobuf/proto"
"vitess.io/vitess/go/vt/external/golib/sqlutils"
+ "vitess.io/vitess/go/vt/key"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/proto/vttime"
"vitess.io/vitess/go/vt/topo"
@@ -102,60 +103,200 @@ var (
}
)
-func TestUpdateShardsToWatch(t *testing.T) {
+func TestShouldWatchTablet(t *testing.T) {
oldClustersToWatch := clustersToWatch
- oldTs := ts
defer func() {
clustersToWatch = oldClustersToWatch
shardsToWatch = nil
- ts = oldTs
}()
- // Create a memory topo-server and create the keyspace and shard records
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
+ testCases := []struct {
+ in []string
+ tablet *topodatapb.Tablet
+ expectedShouldWatch bool
+ }{
+ {
+ in: []string{},
+ tablet: &topodatapb.Tablet{
+ Keyspace: keyspace,
+ Shard: shard,
+ },
+ expectedShouldWatch: true,
+ },
+ {
+ in: []string{keyspace},
+ tablet: &topodatapb.Tablet{
+ Keyspace: keyspace,
+ Shard: shard,
+ },
+ expectedShouldWatch: true,
+ },
+ {
+ in: []string{keyspace + "/-"},
+ tablet: &topodatapb.Tablet{
+ Keyspace: keyspace,
+ Shard: shard,
+ },
+ expectedShouldWatch: true,
+ },
+ {
+ in: []string{keyspace + "/" + shard},
+ tablet: &topodatapb.Tablet{
+ Keyspace: keyspace,
+ Shard: shard,
+ },
+ expectedShouldWatch: true,
+ },
+ {
+ in: []string{"ks/-70", "ks/70-"},
+ tablet: &topodatapb.Tablet{
+ Keyspace: "ks",
+ KeyRange: key.NewKeyRange([]byte{0x50}, []byte{0x70}),
+ },
+ expectedShouldWatch: true,
+ },
+ {
+ in: []string{"ks/-70", "ks/70-"},
+ tablet: &topodatapb.Tablet{
+ Keyspace: "ks",
+ KeyRange: key.NewKeyRange([]byte{0x40}, []byte{0x50}),
+ },
+ expectedShouldWatch: true,
+ },
+ {
+ in: []string{"ks/-70", "ks/70-"},
+ tablet: &topodatapb.Tablet{
+ Keyspace: "ks",
+ KeyRange: key.NewKeyRange([]byte{0x70}, []byte{0x90}),
+ },
+ expectedShouldWatch: true,
+ },
+ {
+ in: []string{"ks/-70", "ks/70-"},
+ tablet: &topodatapb.Tablet{
+ Keyspace: "ks",
+ KeyRange: key.NewKeyRange([]byte{0x60}, []byte{0x90}),
+ },
+ expectedShouldWatch: false,
+ },
+ {
+ in: []string{"ks/50-70"},
+ tablet: &topodatapb.Tablet{
+ Keyspace: "ks",
+ KeyRange: key.NewKeyRange([]byte{0x50}, []byte{0x70}),
+ },
+ expectedShouldWatch: true,
+ },
+ {
+ in: []string{"ks2/-70", "ks2/70-", "unknownKs/-", "ks/-80"},
+ tablet: &topodatapb.Tablet{
+ Keyspace: "ks",
+ KeyRange: key.NewKeyRange([]byte{0x60}, []byte{0x80}),
+ },
+ expectedShouldWatch: true,
+ },
+ {
+ in: []string{"ks2/-70", "ks2/70-", "unknownKs/-", "ks/-80"},
+ tablet: &topodatapb.Tablet{
+ Keyspace: "ks",
+ KeyRange: key.NewKeyRange([]byte{0x80}, []byte{0x90}),
+ },
+ expectedShouldWatch: false,
+ },
+ {
+ in: []string{"ks2/-70", "ks2/70-", "unknownKs/-", "ks/-80"},
+ tablet: &topodatapb.Tablet{
+ Keyspace: "ks",
+ KeyRange: key.NewKeyRange([]byte{0x90}, []byte{0xa0}),
+ },
+ expectedShouldWatch: false,
+ },
+ }
- ts = memorytopo.NewServer(ctx, cell1)
- _, err := ts.GetOrCreateShard(context.Background(), keyspace, shard)
- require.NoError(t, err)
+ for _, tt := range testCases {
+ t.Run(fmt.Sprintf("%v-Tablet-%v-%v", strings.Join(tt.in, ","), tt.tablet.GetKeyspace(), tt.tablet.GetShard()), func(t *testing.T) {
+ clustersToWatch = tt.in
+ err := initializeShardsToWatch()
+ require.NoError(t, err)
+ assert.Equal(t, tt.expectedShouldWatch, shouldWatchTablet(tt.tablet))
+ })
+ }
+}
+
+// TestInitializeShardsToWatch tests that we initialize the shardsToWatch map correctly
+// using the `--clusters_to_watch` flag.
+func TestInitializeShardsToWatch(t *testing.T) {
+ oldClustersToWatch := clustersToWatch
+ defer func() {
+ clustersToWatch = oldClustersToWatch
+ shardsToWatch = nil
+ }()
testCases := []struct {
- in []string
- expected map[string][]string
+ in []string
+ expected map[string][]*topodatapb.KeyRange
+ expectedErr string
}{
{
in: []string{},
- expected: nil,
+ expected: map[string][]*topodatapb.KeyRange{},
},
{
- in: []string{""},
- expected: map[string][]string{},
+ in: []string{"unknownKs"},
+ expected: map[string][]*topodatapb.KeyRange{
+ "unknownKs": {
+ key.NewCompleteKeyRange(),
+ },
+ },
},
{
in: []string{"test/-"},
- expected: map[string][]string{
- "test": {"-"},
+ expected: map[string][]*topodatapb.KeyRange{
+ "test": {
+ key.NewCompleteKeyRange(),
+ },
+ },
+ },
+ {
+ in: []string{"test/324"},
+ expectedErr: `invalid key range "324" while parsing clusters to watch`,
+ },
+ {
+ in: []string{"test/0"},
+ expected: map[string][]*topodatapb.KeyRange{
+ "test": {
+ key.NewCompleteKeyRange(),
+ },
},
},
{
in: []string{"test/-", "test2/-80", "test2/80-"},
- expected: map[string][]string{
- "test": {"-"},
- "test2": {"-80", "80-"},
+ expected: map[string][]*topodatapb.KeyRange{
+ "test": {
+ key.NewCompleteKeyRange(),
+ },
+ "test2": {
+ key.NewKeyRange(nil, []byte{0x80}),
+ key.NewKeyRange([]byte{0x80}, nil),
+ },
},
},
{
- // confirm shards fetch from topo
+ // known keyspace
in: []string{keyspace},
- expected: map[string][]string{
- keyspace: {shard},
+ expected: map[string][]*topodatapb.KeyRange{
+ keyspace: {
+ key.NewCompleteKeyRange(),
+ },
},
},
{
- // confirm shards fetch from topo when keyspace has trailing-slash
+ // keyspace with trailing-slash
in: []string{keyspace + "/"},
- expected: map[string][]string{
- keyspace: {shard},
+ expected: map[string][]*topodatapb.KeyRange{
+ keyspace: {
+ key.NewCompleteKeyRange(),
+ },
},
},
}
@@ -163,10 +304,15 @@ func TestUpdateShardsToWatch(t *testing.T) {
for _, testCase := range testCases {
t.Run(strings.Join(testCase.in, ","), func(t *testing.T) {
defer func() {
- shardsToWatch = make(map[string][]string, 0)
+ shardsToWatch = make(map[string][]*topodatapb.KeyRange, 0)
}()
clustersToWatch = testCase.in
- updateShardsToWatch()
+ err := initializeShardsToWatch()
+ if testCase.expectedErr != "" {
+ require.EqualError(t, err, testCase.expectedErr)
+ return
+ }
+ require.NoError(t, err)
require.Equal(t, testCase.expected, shardsToWatch)
})
}
diff --git a/go/vt/vtorc/logic/vtorc.go b/go/vt/vtorc/logic/vtorc.go
index 1fde6e31c0d..5ac5af50d47 100644
--- a/go/vt/vtorc/logic/vtorc.go
+++ b/go/vt/vtorc/logic/vtorc.go
@@ -326,12 +326,6 @@ func refreshAllInformation(ctx context.Context) error {
return RefreshAllKeyspacesAndShards(ctx)
})
- // Refresh shards to watch.
- eg.Go(func() error {
- updateShardsToWatch()
- return nil
- })
-
// Refresh all tablets.
eg.Go(func() error {
return refreshAllTablets(ctx)