Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions go/vt/topo/consultopo/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/spf13/pflag"

"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vterrors"
Expand Down Expand Up @@ -92,6 +93,10 @@ func getClientCreds() (creds map[string]*ClientAuthCred, err error) {
err = vterrors.Wrapf(err, "Error parsing consul_auth_static_file")
return creds, err
}
if len(creds) == 0 {
err = vterrors.New(vtrpc.Code_FAILED_PRECONDITION, "Found no credentials in consul_auth_static_file")
return creds, err
}
return creds, nil
}

Expand Down
52 changes: 34 additions & 18 deletions go/vt/topo/consultopo/server_flaky_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,10 @@ import (
"testing"
"time"

"vitess.io/vitess/go/vt/log"

"github.com/hashicorp/consul/api"

"vitess.io/vitess/go/testfiles"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/test"

Expand Down Expand Up @@ -297,25 +296,42 @@ func TestConsulTopoWithAuthFailure(t *testing.T) {

consulAuthClientStaticFile = tmpFile.Name()

jsonConfig := "{\"global\":{\"acl_token\":\"badtoken\"}}"
if err := os.WriteFile(tmpFile.Name(), []byte(jsonConfig), 0600); err != nil {
t.Fatalf("couldn't write temp file: %v", err)
}
// check valid, empty json causes error
{
jsonConfig := "{}"
if err := os.WriteFile(tmpFile.Name(), []byte(jsonConfig), 0600); err != nil {
t.Fatalf("couldn't write temp file: %v", err)
}

// Create the server on the new root.
ts, err := topo.OpenServer("consul", serverAddr, path.Join("globalRoot", topo.GlobalCell))
if err != nil {
t.Fatalf("OpenServer() failed: %v", err)
// Create the server on the new root.
_, err := topo.OpenServer("consul", serverAddr, path.Join("globalRoot", topo.GlobalCell))
if err == nil {
t.Fatal("Expected OpenServer() to return an error due to bad config, got nil")
}
}

// Attempt to Create the CellInfo.
err = ts.CreateCellInfo(context.Background(), test.LocalCellName, &topodatapb.CellInfo{
ServerAddress: serverAddr,
Root: path.Join("globalRoot", test.LocalCellName),
})
// check bad token causes error
{
jsonConfig := "{\"global\":{\"acl_token\":\"badtoken\"}}"
if err := os.WriteFile(tmpFile.Name(), []byte(jsonConfig), 0600); err != nil {
t.Fatalf("couldn't write temp file: %v", err)
}

// Create the server on the new root.
ts, err := topo.OpenServer("consul", serverAddr, path.Join("globalRoot", topo.GlobalCell))
if err != nil {
t.Fatalf("OpenServer() failed: %v", err)
}

// Attempt to Create the CellInfo.
err = ts.CreateCellInfo(context.Background(), test.LocalCellName, &topodatapb.CellInfo{
ServerAddress: serverAddr,
Root: path.Join("globalRoot", test.LocalCellName),
})

want := "Failed request: ACL not found"
if err == nil || err.Error() != want {
t.Errorf("Expected CreateCellInfo to fail: got %v, want %s", err, want)
want := "Failed request: ACL not found"
if err == nil || err.Error() != want {
t.Errorf("Expected CreateCellInfo to fail: got %v, want %s", err, want)
}
}
}
39 changes: 4 additions & 35 deletions go/vt/vtorc/logic/tablet_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"slices"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/spf13/pflag"
Expand All @@ -45,11 +44,10 @@ import (
)

var (
ts *topo.Server
tmc tmclient.TabletManagerClient
clustersToWatch []string
shutdownWaitTime = 30 * time.Second
shardsLockCounter int32
ts *topo.Server
tmc tmclient.TabletManagerClient
clustersToWatch []string
shutdownWaitTime = 30 * time.Second
// shardsToWatch is a map storing the shards for a given keyspace that need to be watched.
// We store the key range for all the shards that we want to watch.
// This is populated by parsing `--clusters_to_watch` flag.
Expand Down Expand Up @@ -349,35 +347,6 @@ func refreshTablets(tablets []*topo.TabletInfo, query string, args []any, loader
}
}

func getLockAction(analysedInstance string, code inst.AnalysisCode) string {
return fmt.Sprintf("VTOrc Recovery for %v on %v", code, analysedInstance)
}

// LockShard locks the keyspace-shard preventing others from performing conflicting actions.
func LockShard(ctx context.Context, keyspace, shard, lockAction string) (context.Context, func(*error), error) {
if keyspace == "" {
return nil, nil, errors.New("can't lock shard: keyspace is unspecified")
}
if shard == "" {
return nil, nil, errors.New("can't lock shard: shard name is unspecified")
}
val := atomic.LoadInt32(&hasReceivedSIGTERM)
if val > 0 {
return nil, nil, errors.New("can't lock shard: SIGTERM received")
}

atomic.AddInt32(&shardsLockCounter, 1)
ctx, unlock, err := ts.TryLockShard(ctx, keyspace, shard, lockAction)
if err != nil {
atomic.AddInt32(&shardsLockCounter, -1)
return nil, nil, err
}
return ctx, func(e *error) {
defer atomic.AddInt32(&shardsLockCounter, -1)
unlock(e)
}, nil
}

// tabletUndoDemotePrimary calls the said RPC for the given tablet.
func tabletUndoDemotePrimary(ctx context.Context, tablet *topodatapb.Tablet, semiSync bool) error {
tmcCtx, tmcCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
Expand Down
52 changes: 52 additions & 0 deletions go/vt/vtorc/logic/topology_recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ package logic
import (
"context"
"encoding/json"
"errors"
"fmt"
"math/rand/v2"
"sync/atomic"
"time"

"vitess.io/vitess/go/stats"
Expand Down Expand Up @@ -69,6 +71,9 @@ var (
"Shard",
})

// shardsLockCounter is a count of in-flight shard locks. Use atomics to read/update.
shardsLockCounter int64

// recoveriesCounter counts the number of recoveries that VTOrc has performed
recoveriesCounter = stats.NewCountersWithSingleLabel("RecoveriesCount", "Count of the different recoveries performed", "RecoveryType", actionableRecoveriesNames...)

Expand All @@ -77,6 +82,10 @@ var (

// recoveriesFailureCounter counts the number of failed recoveries that VTOrc has performed
recoveriesFailureCounter = stats.NewCountersWithSingleLabel("FailedRecoveries", "Count of the different failed recoveries performed", "RecoveryType", actionableRecoveriesNames...)

// shardLockTimings measures the timing of LockShard operations.
shardLockTimingsActions = []string{"Lock", "Unlock"}
shardLockTimings = stats.NewTimings("ShardLockTimings", "Timings of global shard locks", "Action", shardLockTimingsActions...)
)

// recoveryFunction is the code of the recovery function to be used
Expand Down Expand Up @@ -144,13 +153,56 @@ func NewTopologyRecoveryStep(id int64, message string) *TopologyRecoveryStep {
}

func init() {
// ShardLocksActive is a stats representation of shardsLockCounter.
stats.NewGaugeFunc("ShardLocksActive", "Number of actively-held shard locks", func() int64 {
return atomic.LoadInt64(&shardsLockCounter)
})
go initializeTopologyRecoveryPostConfiguration()
}

func initializeTopologyRecoveryPostConfiguration() {
config.WaitForConfigurationToBeLoaded()
}

func getLockAction(analysedInstance string, code inst.AnalysisCode) string {
return fmt.Sprintf("VTOrc Recovery for %v on %v", code, analysedInstance)
}

// LockShard locks the keyspace-shard preventing others from performing conflicting actions.
func LockShard(ctx context.Context, keyspace, shard, lockAction string) (context.Context, func(*error), error) {
if keyspace == "" {
return nil, nil, errors.New("can't lock shard: keyspace is unspecified")
}
if shard == "" {
return nil, nil, errors.New("can't lock shard: shard name is unspecified")
}
val := atomic.LoadInt32(&hasReceivedSIGTERM)
if val > 0 {
return nil, nil, errors.New("can't lock shard: SIGTERM received")
}

startTime := time.Now()
defer func() {
lockTime := time.Since(startTime)
shardLockTimings.Add("Lock", lockTime)
}()

atomic.AddInt64(&shardsLockCounter, 1)
ctx, unlock, err := ts.TryLockShard(ctx, keyspace, shard, lockAction)
if err != nil {
atomic.AddInt64(&shardsLockCounter, -1)
return nil, nil, err
}
return ctx, func(e *error) {
startTime := time.Now()
defer func() {
atomic.AddInt64(&shardsLockCounter, -1)
shardLockTimings.Add("Unlock", time.Since(startTime))
}()
unlock(e)
}, nil
}

// AuditTopologyRecovery audits a single step in a topology recovery process.
func AuditTopologyRecovery(topologyRecovery *TopologyRecovery, message string) error {
log.Infof("topology_recovery: %s", message)
Expand Down
10 changes: 9 additions & 1 deletion go/vt/vtorc/logic/vtorc.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ var (
discoveryRecentCountGauge = stats.NewGauge("DiscoveriesRecentCount", "Number of recent discoveries")
discoveryWorkersGauge = stats.NewGauge("DiscoveryWorkers", "Number of discovery workers")
discoveryWorkersActiveGauge = stats.NewGauge("DiscoveryWorkersActive", "Number of discovery workers actively discovering tablets")

discoverInstanceTimingsActions = []string{"Backend", "Instance", "Other"}
discoverInstanceTimings = stats.NewTimings("DiscoverInstanceTimings", "Timings for instance discovery actions", "Action", discoverInstanceTimingsActions...)
)

var discoveryMetrics = collection.CreateOrReturnCollection(DiscoveryMetricsName)
Expand Down Expand Up @@ -118,7 +121,7 @@ func closeVTOrc() {
func waitForLocksRelease() {
timeout := time.After(shutdownWaitTime)
for {
count := atomic.LoadInt32(&shardsLockCounter)
count := atomic.LoadInt64(&shardsLockCounter)
if count == 0 {
break
}
Expand Down Expand Up @@ -215,6 +218,11 @@ func DiscoverInstance(tabletAlias string, forceDiscovery bool) {
totalLatency := latency.Elapsed("total")
backendLatency := latency.Elapsed("backend")
instanceLatency := latency.Elapsed("instance")
otherLatency := totalLatency - (backendLatency + instanceLatency)

discoverInstanceTimings.Add("Backend", backendLatency)
discoverInstanceTimings.Add("Instance", instanceLatency)
discoverInstanceTimings.Add("Other", otherLatency)

if forceDiscovery {
log.Infof("Force discovered - %+v, err - %v", instance, err)
Expand Down
8 changes: 4 additions & 4 deletions go/vt/vtorc/logic/vtorc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ func TestWaitForLocksRelease(t *testing.T) {

t.Run("Timeout from shutdownWaitTime", func(t *testing.T) {
// Increment shardsLockCounter to simulate locking of a shard
atomic.AddInt32(&shardsLockCounter, +1)
atomic.AddInt64(&shardsLockCounter, +1)
defer func() {
// Restore the initial value
atomic.StoreInt32(&shardsLockCounter, 0)
atomic.StoreInt64(&shardsLockCounter, 0)
}()
shutdownWaitTime = 200 * time.Millisecond
timeSpent := waitForLocksReleaseAndGetTimeWaitedFor()
Expand All @@ -42,12 +42,12 @@ func TestWaitForLocksRelease(t *testing.T) {

t.Run("Successful wait for locks release", func(t *testing.T) {
// Increment shardsLockCounter to simulate locking of a shard
atomic.AddInt32(&shardsLockCounter, +1)
atomic.AddInt64(&shardsLockCounter, +1)
shutdownWaitTime = 500 * time.Millisecond
// Release the locks after 200 milliseconds
go func() {
time.Sleep(200 * time.Millisecond)
atomic.StoreInt32(&shardsLockCounter, 0)
atomic.StoreInt64(&shardsLockCounter, 0)
}()
timeSpent := waitForLocksReleaseAndGetTimeWaitedFor()
assert.Greater(t, timeSpent, 100*time.Millisecond, "waitForLocksRelease should wait for the locks and not return early")
Expand Down
Loading