diff --git a/go/vt/topo/consultopo/server.go b/go/vt/topo/consultopo/server.go index ab61a40b1e8..c04d6ed5c4d 100644 --- a/go/vt/topo/consultopo/server.go +++ b/go/vt/topo/consultopo/server.go @@ -30,6 +30,7 @@ import ( "github.com/spf13/pflag" "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/servenv" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/vterrors" @@ -92,6 +93,10 @@ func getClientCreds() (creds map[string]*ClientAuthCred, err error) { err = vterrors.Wrapf(err, "Error parsing consul_auth_static_file") return creds, err } + if len(creds) == 0 { + err = vterrors.New(vtrpc.Code_FAILED_PRECONDITION, "Found no credentials in consul_auth_static_file") + return creds, err + } return creds, nil } diff --git a/go/vt/topo/consultopo/server_flaky_test.go b/go/vt/topo/consultopo/server_flaky_test.go index a987336dd01..3a3a6ad3205 100644 --- a/go/vt/topo/consultopo/server_flaky_test.go +++ b/go/vt/topo/consultopo/server_flaky_test.go @@ -26,11 +26,10 @@ import ( "testing" "time" - "vitess.io/vitess/go/vt/log" - "github.com/hashicorp/consul/api" "vitess.io/vitess/go/testfiles" + "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/test" @@ -297,25 +296,42 @@ func TestConsulTopoWithAuthFailure(t *testing.T) { consulAuthClientStaticFile = tmpFile.Name() - jsonConfig := "{\"global\":{\"acl_token\":\"badtoken\"}}" - if err := os.WriteFile(tmpFile.Name(), []byte(jsonConfig), 0600); err != nil { - t.Fatalf("couldn't write temp file: %v", err) - } + // check valid, empty json causes error + { + jsonConfig := "{}" + if err := os.WriteFile(tmpFile.Name(), []byte(jsonConfig), 0600); err != nil { + t.Fatalf("couldn't write temp file: %v", err) + } - // Create the server on the new root. - ts, err := topo.OpenServer("consul", serverAddr, path.Join("globalRoot", topo.GlobalCell)) - if err != nil { - t.Fatalf("OpenServer() failed: %v", err) + // Create the server on the new root. + _, err := topo.OpenServer("consul", serverAddr, path.Join("globalRoot", topo.GlobalCell)) + if err == nil { + t.Fatal("Expected OpenServer() to return an error due to bad config, got nil") + } } - // Attempt to Create the CellInfo. - err = ts.CreateCellInfo(context.Background(), test.LocalCellName, &topodatapb.CellInfo{ - ServerAddress: serverAddr, - Root: path.Join("globalRoot", test.LocalCellName), - }) + // check bad token causes error + { + jsonConfig := "{\"global\":{\"acl_token\":\"badtoken\"}}" + if err := os.WriteFile(tmpFile.Name(), []byte(jsonConfig), 0600); err != nil { + t.Fatalf("couldn't write temp file: %v", err) + } + + // Create the server on the new root. + ts, err := topo.OpenServer("consul", serverAddr, path.Join("globalRoot", topo.GlobalCell)) + if err != nil { + t.Fatalf("OpenServer() failed: %v", err) + } + + // Attempt to Create the CellInfo. + err = ts.CreateCellInfo(context.Background(), test.LocalCellName, &topodatapb.CellInfo{ + ServerAddress: serverAddr, + Root: path.Join("globalRoot", test.LocalCellName), + }) - want := "Failed request: ACL not found" - if err == nil || err.Error() != want { - t.Errorf("Expected CreateCellInfo to fail: got %v, want %s", err, want) + want := "Failed request: ACL not found" + if err == nil || err.Error() != want { + t.Errorf("Expected CreateCellInfo to fail: got %v, want %s", err, want) + } } } diff --git a/go/vt/vtorc/logic/tablet_discovery.go b/go/vt/vtorc/logic/tablet_discovery.go index aa41ed7e64e..3a5c0cb8888 100644 --- a/go/vt/vtorc/logic/tablet_discovery.go +++ b/go/vt/vtorc/logic/tablet_discovery.go @@ -23,7 +23,6 @@ import ( "slices" "strings" "sync" - "sync/atomic" "time" "github.com/spf13/pflag" @@ -45,11 +44,10 @@ import ( ) var ( - ts *topo.Server - tmc tmclient.TabletManagerClient - clustersToWatch []string - shutdownWaitTime = 30 * time.Second - shardsLockCounter int32 + ts *topo.Server + tmc tmclient.TabletManagerClient + clustersToWatch []string + shutdownWaitTime = 30 * time.Second // shardsToWatch is a map storing the shards for a given keyspace that need to be watched. // We store the key range for all the shards that we want to watch. // This is populated by parsing `--clusters_to_watch` flag. @@ -349,35 +347,6 @@ func refreshTablets(tablets []*topo.TabletInfo, query string, args []any, loader } } -func getLockAction(analysedInstance string, code inst.AnalysisCode) string { - return fmt.Sprintf("VTOrc Recovery for %v on %v", code, analysedInstance) -} - -// LockShard locks the keyspace-shard preventing others from performing conflicting actions. -func LockShard(ctx context.Context, keyspace, shard, lockAction string) (context.Context, func(*error), error) { - if keyspace == "" { - return nil, nil, errors.New("can't lock shard: keyspace is unspecified") - } - if shard == "" { - return nil, nil, errors.New("can't lock shard: shard name is unspecified") - } - val := atomic.LoadInt32(&hasReceivedSIGTERM) - if val > 0 { - return nil, nil, errors.New("can't lock shard: SIGTERM received") - } - - atomic.AddInt32(&shardsLockCounter, 1) - ctx, unlock, err := ts.TryLockShard(ctx, keyspace, shard, lockAction) - if err != nil { - atomic.AddInt32(&shardsLockCounter, -1) - return nil, nil, err - } - return ctx, func(e *error) { - defer atomic.AddInt32(&shardsLockCounter, -1) - unlock(e) - }, nil -} - // tabletUndoDemotePrimary calls the said RPC for the given tablet. func tabletUndoDemotePrimary(ctx context.Context, tablet *topodatapb.Tablet, semiSync bool) error { tmcCtx, tmcCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) diff --git a/go/vt/vtorc/logic/topology_recovery.go b/go/vt/vtorc/logic/topology_recovery.go index e6105c0691d..1eee1c1c4e6 100644 --- a/go/vt/vtorc/logic/topology_recovery.go +++ b/go/vt/vtorc/logic/topology_recovery.go @@ -19,8 +19,10 @@ package logic import ( "context" "encoding/json" + "errors" "fmt" "math/rand/v2" + "sync/atomic" "time" "vitess.io/vitess/go/stats" @@ -69,6 +71,9 @@ var ( "Shard", }) + // shardsLockCounter is a count of in-flight shard locks. Use atomics to read/update. + shardsLockCounter int64 + // recoveriesCounter counts the number of recoveries that VTOrc has performed recoveriesCounter = stats.NewCountersWithSingleLabel("RecoveriesCount", "Count of the different recoveries performed", "RecoveryType", actionableRecoveriesNames...) @@ -77,6 +82,10 @@ var ( // recoveriesFailureCounter counts the number of failed recoveries that VTOrc has performed recoveriesFailureCounter = stats.NewCountersWithSingleLabel("FailedRecoveries", "Count of the different failed recoveries performed", "RecoveryType", actionableRecoveriesNames...) + + // shardLockTimings measures the timing of LockShard operations. + shardLockTimingsActions = []string{"Lock", "Unlock"} + shardLockTimings = stats.NewTimings("ShardLockTimings", "Timings of global shard locks", "Action", shardLockTimingsActions...) ) // recoveryFunction is the code of the recovery function to be used @@ -144,6 +153,10 @@ func NewTopologyRecoveryStep(id int64, message string) *TopologyRecoveryStep { } func init() { + // ShardLocksActive is a stats representation of shardsLockCounter. + stats.NewGaugeFunc("ShardLocksActive", "Number of actively-held shard locks", func() int64 { + return atomic.LoadInt64(&shardsLockCounter) + }) go initializeTopologyRecoveryPostConfiguration() } @@ -151,6 +164,45 @@ func initializeTopologyRecoveryPostConfiguration() { config.WaitForConfigurationToBeLoaded() } +func getLockAction(analysedInstance string, code inst.AnalysisCode) string { + return fmt.Sprintf("VTOrc Recovery for %v on %v", code, analysedInstance) +} + +// LockShard locks the keyspace-shard preventing others from performing conflicting actions. +func LockShard(ctx context.Context, keyspace, shard, lockAction string) (context.Context, func(*error), error) { + if keyspace == "" { + return nil, nil, errors.New("can't lock shard: keyspace is unspecified") + } + if shard == "" { + return nil, nil, errors.New("can't lock shard: shard name is unspecified") + } + val := atomic.LoadInt32(&hasReceivedSIGTERM) + if val > 0 { + return nil, nil, errors.New("can't lock shard: SIGTERM received") + } + + startTime := time.Now() + defer func() { + lockTime := time.Since(startTime) + shardLockTimings.Add("Lock", lockTime) + }() + + atomic.AddInt64(&shardsLockCounter, 1) + ctx, unlock, err := ts.TryLockShard(ctx, keyspace, shard, lockAction) + if err != nil { + atomic.AddInt64(&shardsLockCounter, -1) + return nil, nil, err + } + return ctx, func(e *error) { + startTime := time.Now() + defer func() { + atomic.AddInt64(&shardsLockCounter, -1) + shardLockTimings.Add("Unlock", time.Since(startTime)) + }() + unlock(e) + }, nil +} + // AuditTopologyRecovery audits a single step in a topology recovery process. func AuditTopologyRecovery(topologyRecovery *TopologyRecovery, message string) error { log.Infof("topology_recovery: %s", message) diff --git a/go/vt/vtorc/logic/vtorc.go b/go/vt/vtorc/logic/vtorc.go index b369324c593..f0fd4c9725a 100644 --- a/go/vt/vtorc/logic/vtorc.go +++ b/go/vt/vtorc/logic/vtorc.go @@ -61,6 +61,9 @@ var ( discoveryRecentCountGauge = stats.NewGauge("DiscoveriesRecentCount", "Number of recent discoveries") discoveryWorkersGauge = stats.NewGauge("DiscoveryWorkers", "Number of discovery workers") discoveryWorkersActiveGauge = stats.NewGauge("DiscoveryWorkersActive", "Number of discovery workers actively discovering tablets") + + discoverInstanceTimingsActions = []string{"Backend", "Instance", "Other"} + discoverInstanceTimings = stats.NewTimings("DiscoverInstanceTimings", "Timings for instance discovery actions", "Action", discoverInstanceTimingsActions...) ) var discoveryMetrics = collection.CreateOrReturnCollection(DiscoveryMetricsName) @@ -118,7 +121,7 @@ func closeVTOrc() { func waitForLocksRelease() { timeout := time.After(shutdownWaitTime) for { - count := atomic.LoadInt32(&shardsLockCounter) + count := atomic.LoadInt64(&shardsLockCounter) if count == 0 { break } @@ -215,6 +218,11 @@ func DiscoverInstance(tabletAlias string, forceDiscovery bool) { totalLatency := latency.Elapsed("total") backendLatency := latency.Elapsed("backend") instanceLatency := latency.Elapsed("instance") + otherLatency := totalLatency - (backendLatency + instanceLatency) + + discoverInstanceTimings.Add("Backend", backendLatency) + discoverInstanceTimings.Add("Instance", instanceLatency) + discoverInstanceTimings.Add("Other", otherLatency) if forceDiscovery { log.Infof("Force discovered - %+v, err - %v", instance, err) diff --git a/go/vt/vtorc/logic/vtorc_test.go b/go/vt/vtorc/logic/vtorc_test.go index edd8141e8b7..42464c9f8c7 100644 --- a/go/vt/vtorc/logic/vtorc_test.go +++ b/go/vt/vtorc/logic/vtorc_test.go @@ -29,10 +29,10 @@ func TestWaitForLocksRelease(t *testing.T) { t.Run("Timeout from shutdownWaitTime", func(t *testing.T) { // Increment shardsLockCounter to simulate locking of a shard - atomic.AddInt32(&shardsLockCounter, +1) + atomic.AddInt64(&shardsLockCounter, +1) defer func() { // Restore the initial value - atomic.StoreInt32(&shardsLockCounter, 0) + atomic.StoreInt64(&shardsLockCounter, 0) }() shutdownWaitTime = 200 * time.Millisecond timeSpent := waitForLocksReleaseAndGetTimeWaitedFor() @@ -42,12 +42,12 @@ func TestWaitForLocksRelease(t *testing.T) { t.Run("Successful wait for locks release", func(t *testing.T) { // Increment shardsLockCounter to simulate locking of a shard - atomic.AddInt32(&shardsLockCounter, +1) + atomic.AddInt64(&shardsLockCounter, +1) shutdownWaitTime = 500 * time.Millisecond // Release the locks after 200 milliseconds go func() { time.Sleep(200 * time.Millisecond) - atomic.StoreInt32(&shardsLockCounter, 0) + atomic.StoreInt64(&shardsLockCounter, 0) }() timeSpent := waitForLocksReleaseAndGetTimeWaitedFor() assert.Greater(t, timeSpent, 100*time.Millisecond, "waitForLocksRelease should wait for the locks and not return early")