Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go/test/endtoend/cluster/vtorc_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type VTOrcConfiguration struct {
MySQLReplicaUser string
MySQLReplicaPassword string
RecoveryPeriodBlockSeconds int
TopologyRefreshSeconds int `json:",omitempty"`
PreventCrossDataCenterPrimaryFailover bool `json:",omitempty"`
LockShardTimeoutSeconds int `json:",omitempty"`
ReplicationLagQuery string `json:",omitempty"`
Expand Down
53 changes: 38 additions & 15 deletions go/test/endtoend/vtorc/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,27 @@ import (
"vitess.io/vitess/go/test/endtoend/vtorc/utils"
)

// make an api call to /api/problems endpoint
// and verify the output
func TestProblemsAPI(t *testing.T) {
// TestAPIEndpoints tests the various API endpoints that VTOrc offers.
func TestAPIEndpoints(t *testing.T) {
defer cluster.PanicHandler(t)
utils.SetupVttabletsAndVTOrcs(t, clusterInfo, 2, 1, nil, cluster.VTOrcConfiguration{
PreventCrossDataCenterPrimaryFailover: true,
RecoveryPeriodBlockSeconds: 5,
// The default topo refresh time is 3 seconds. We are intentionally making it slower for the test, so that we have time to verify
// the /debug/health output before and after the first refresh runs.
TopologyRefreshSeconds: 10,
}, 1, "")
keyspace := &clusterInfo.ClusterInstance.Keyspaces[0]
shard0 := &keyspace.Shards[0]
vtorc := clusterInfo.ClusterInstance.VTOrcProcesses[0]
// Call API with retry to ensure VTOrc is up
status, resp := utils.MakeAPICallRetry(t, vtorc, "/debug/health", func(code int, response string) bool {
return code == 0
})
// When VTOrc is up and hasn't run the topo-refresh, is should be healthy but HasDiscovered should be false.
assert.Equal(t, 500, status)
assert.Contains(t, resp, `"Healthy": true,`)
assert.Contains(t, resp, `"DiscoveredOnce": false`)

// find primary from topo
primary := utils.ShardPrimaryTablet(t, clusterInfo, keyspace, shard0)
Expand All @@ -63,14 +73,17 @@ func TestProblemsAPI(t *testing.T) {

t.Run("Health API", func(t *testing.T) {
// Check that VTOrc is healthy
status, resp := utils.MakeAPICall(t, vtorc, "/debug/health")
status, resp, err := utils.MakeAPICall(t, vtorc, "/debug/health")
require.NoError(t, err)
assert.Equal(t, 200, status)
assert.Contains(t, resp, `"Healthy": true,`)
assert.Contains(t, resp, `"DiscoveredOnce": true`)
})

t.Run("Liveness API", func(t *testing.T) {
// Check that VTOrc is live
status, resp := utils.MakeAPICall(t, vtorc, "/debug/liveness")
status, resp, err := utils.MakeAPICall(t, vtorc, "/debug/liveness")
require.NoError(t, err)
assert.Equal(t, 200, status)
assert.Empty(t, resp)
})
Expand All @@ -82,7 +95,8 @@ func TestProblemsAPI(t *testing.T) {

t.Run("Disable Recoveries API", func(t *testing.T) {
// Disable recoveries of VTOrc
status, resp := utils.MakeAPICall(t, vtorc, "/api/disable-global-recoveries")
status, resp, err := utils.MakeAPICall(t, vtorc, "/api/disable-global-recoveries")
require.NoError(t, err)
assert.Equal(t, 200, status)
assert.Equal(t, "Global recoveries disabled\n", resp)
})
Expand All @@ -103,29 +117,34 @@ func TestProblemsAPI(t *testing.T) {
assert.Contains(t, resp, `"Analysis": "ReplicationStopped"`)

// Verify that filtering also works in the API as intended
status, resp = utils.MakeAPICall(t, vtorc, "/api/replication-analysis?keyspace=ks&shard=0")
status, resp, err = utils.MakeAPICall(t, vtorc, "/api/replication-analysis?keyspace=ks&shard=0")
require.NoError(t, err)
assert.Equal(t, 200, status, resp)
assert.Contains(t, resp, fmt.Sprintf(`"Port": %d`, replica.MySQLPort))

// Verify that filtering by keyspace also works in the API as intended
status, resp = utils.MakeAPICall(t, vtorc, "/api/replication-analysis?keyspace=ks")
status, resp, err = utils.MakeAPICall(t, vtorc, "/api/replication-analysis?keyspace=ks")
require.NoError(t, err)
assert.Equal(t, 200, status, resp)
assert.Contains(t, resp, fmt.Sprintf(`"Port": %d`, replica.MySQLPort))

// Check that filtering using keyspace and shard works
status, resp = utils.MakeAPICall(t, vtorc, "/api/replication-analysis?keyspace=ks&shard=80-")
status, resp, err = utils.MakeAPICall(t, vtorc, "/api/replication-analysis?keyspace=ks&shard=80-")
require.NoError(t, err)
assert.Equal(t, 200, status, resp)
assert.Equal(t, "[]", resp)

// Check that filtering using just the shard fails
status, resp = utils.MakeAPICall(t, vtorc, "/api/replication-analysis?shard=0")
status, resp, err = utils.MakeAPICall(t, vtorc, "/api/replication-analysis?shard=0")
require.NoError(t, err)
assert.Equal(t, 400, status, resp)
assert.Equal(t, "Filtering by shard without keyspace isn't supported\n", resp)
})

t.Run("Enable Recoveries API", func(t *testing.T) {
// Enable recoveries of VTOrc
status, resp := utils.MakeAPICall(t, vtorc, "/api/enable-global-recoveries")
status, resp, err := utils.MakeAPICall(t, vtorc, "/api/enable-global-recoveries")
require.NoError(t, err)
assert.Equal(t, 200, status)
assert.Equal(t, "Global recoveries enabled\n", resp)

Expand Down Expand Up @@ -156,22 +175,26 @@ func TestProblemsAPI(t *testing.T) {
assert.Contains(t, resp, fmt.Sprintf(`"InstanceAlias": "%v"`, replica.Alias))

// Check that filtering using keyspace and shard works
status, resp = utils.MakeAPICall(t, vtorc, "/api/problems?keyspace=ks&shard=0")
status, resp, err = utils.MakeAPICall(t, vtorc, "/api/problems?keyspace=ks&shard=0")
require.NoError(t, err)
assert.Equal(t, 200, status, resp)
assert.Contains(t, resp, fmt.Sprintf(`"InstanceAlias": "%v"`, replica.Alias))

// Check that filtering using keyspace works
status, resp = utils.MakeAPICall(t, vtorc, "/api/problems?keyspace=ks")
status, resp, err = utils.MakeAPICall(t, vtorc, "/api/problems?keyspace=ks")
require.NoError(t, err)
assert.Equal(t, 200, status, resp)
assert.Contains(t, resp, fmt.Sprintf(`"InstanceAlias": "%v"`, replica.Alias))

// Check that filtering using keyspace and shard works
status, resp = utils.MakeAPICall(t, vtorc, "/api/problems?keyspace=ks&shard=80-")
status, resp, err = utils.MakeAPICall(t, vtorc, "/api/problems?keyspace=ks&shard=80-")
require.NoError(t, err)
assert.Equal(t, 200, status, resp)
assert.Equal(t, "null", resp)

// Check that filtering using just the shard fails
status, resp = utils.MakeAPICall(t, vtorc, "/api/problems?shard=0")
status, resp, err = utils.MakeAPICall(t, vtorc, "/api/problems?shard=0")
require.NoError(t, err)
assert.Equal(t, 400, status, resp)
assert.Equal(t, "Filtering by shard without keyspace isn't supported\n", resp)
})
Expand Down
8 changes: 3 additions & 5 deletions go/test/endtoend/vtorc/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -737,12 +737,10 @@ func CheckSourcePort(t *testing.T, replica *cluster.Vttablet, source *cluster.Vt
}

// MakeAPICall is used make an API call given the url. It returns the status and the body of the response received
func MakeAPICall(t *testing.T, vtorc *cluster.VTOrcProcess, url string) (status int, response string) {
func MakeAPICall(t *testing.T, vtorc *cluster.VTOrcProcess, url string) (status int, response string, err error) {
t.Helper()
var err error
status, response, err = vtorc.MakeAPICall(url)
require.NoError(t, err)
return status, response
return status, response, err
}

// MakeAPICallRetry is used to make an API call and retry on the given condition.
Expand All @@ -756,7 +754,7 @@ func MakeAPICallRetry(t *testing.T, vtorc *cluster.VTOrcProcess, url string, ret
t.Fatal("timed out waiting for api to work")
return
default:
status, response = MakeAPICall(t, vtorc, url)
status, response, _ := MakeAPICall(t, vtorc, url)
if retry(status, response) {
time.Sleep(1 * time.Second)
break
Expand Down
27 changes: 22 additions & 5 deletions go/vt/vtorc/logic/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ func waitForLocksRelease() {
// instance discovery per entry.
func handleDiscoveryRequests() {
discoveryQueue = discovery.CreateOrReturnQueue("DEFAULT")

// create a pool of discovery workers
for i := uint(0); i < config.DiscoveryMaxConcurrency; i++ {
go func() {
Expand All @@ -178,7 +177,7 @@ func handleDiscoveryRequests() {
}

// DiscoverInstance will attempt to discover (poll) an instance (unless
// it is already up to date) and will also ensure that its primary and
// it is already up-to-date) and will also ensure that its primary and
// replicas (if any) are also checked.
func DiscoverInstance(instanceKey inst.InstanceKey, forceDiscovery bool) {
if inst.InstanceIsForgotten(&instanceKey) {
Expand Down Expand Up @@ -272,7 +271,6 @@ func DiscoverInstance(instanceKey inst.InstanceKey, forceDiscovery bool) {
// onHealthTick handles the actions to take to discover/poll instances
func onHealthTick() {
wasAlreadyElected := IsLeader()

{
myIsElectedNode, err := process.AttemptElection()
if err != nil {
Expand Down Expand Up @@ -438,8 +436,27 @@ func ContinuousDiscovery() {
}
}()
case <-tabletTopoTick:
go RefreshAllKeyspaces()
go refreshAllTablets()
// Create a wait group
var wg sync.WaitGroup

// Refresh all keyspace information.
wg.Add(1)
go func() {
defer wg.Done()
RefreshAllKeyspaces()
}()

// Refresh all tablets.
wg.Add(1)
go func() {
defer wg.Done()
refreshAllTablets()
}()

// Wait for both the refreshes to complete
wg.Wait()
// We have completed one discovery cycle in the entirety of it. We should update the process health.
process.FirstDiscoveryCycleComplete.Store(true)
}
}
}
3 changes: 3 additions & 0 deletions go/vt/vtorc/process/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
var lastHealthCheckUnixNano int64
var lastGoodHealthCheckUnixNano int64
var LastContinousCheckHealthy int64
var FirstDiscoveryCycleComplete atomic.Bool

var lastHealthCheckCache = cache.New(config.HealthPollSeconds*time.Second, time.Second)

Expand Down Expand Up @@ -73,6 +74,7 @@ type HealthStatus struct {
Hostname string
Token string
IsActiveNode bool
DiscoveredOnce bool
ActiveNode *NodeHealth
Error error
AvailableNodes [](*NodeHealth)
Expand Down Expand Up @@ -119,6 +121,7 @@ func HealthTest() (health *HealthStatus, err error) {
return health, err
}
health.Healthy = healthy
health.DiscoveredOnce = FirstDiscoveryCycleComplete.Load()

if health.ActiveNode, health.IsActiveNode, err = ElectedNode(); err != nil {
health.Error = err
Expand Down
3 changes: 2 additions & 1 deletion go/vt/vtorc/server/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,8 @@ func healthAPIHandler(response http.ResponseWriter, request *http.Request) {
return
}
code := http.StatusOK
if !health.Healthy {
// If the process isn't healthy, or if the first discovery cycle hasn't completed, we return an internal server error.
if !health.Healthy || !health.DiscoveredOnce {
code = http.StatusInternalServerError
}
returnAsJSON(response, code, health)
Expand Down