diff --git a/go/test/endtoend/cluster/vtorc_process.go b/go/test/endtoend/cluster/vtorc_process.go index 882b716208a..38f259d8945 100644 --- a/go/test/endtoend/cluster/vtorc_process.go +++ b/go/test/endtoend/cluster/vtorc_process.go @@ -55,6 +55,7 @@ type VTOrcConfiguration struct { MySQLReplicaUser string MySQLReplicaPassword string RecoveryPeriodBlockSeconds int + TopologyRefreshSeconds int `json:",omitempty"` PreventCrossDataCenterPrimaryFailover bool `json:",omitempty"` LockShardTimeoutSeconds int `json:",omitempty"` ReplicationLagQuery string `json:",omitempty"` diff --git a/go/test/endtoend/vtorc/api/api_test.go b/go/test/endtoend/vtorc/api/api_test.go index 4885a67aa9c..3af5725d3e9 100644 --- a/go/test/endtoend/vtorc/api/api_test.go +++ b/go/test/endtoend/vtorc/api/api_test.go @@ -28,17 +28,27 @@ import ( "vitess.io/vitess/go/test/endtoend/vtorc/utils" ) -// make an api call to /api/problems endpoint -// and verify the output -func TestProblemsAPI(t *testing.T) { +// TestAPIEndpoints tests the various API endpoints that VTOrc offers. +func TestAPIEndpoints(t *testing.T) { defer cluster.PanicHandler(t) utils.SetupVttabletsAndVTOrcs(t, clusterInfo, 2, 1, nil, cluster.VTOrcConfiguration{ PreventCrossDataCenterPrimaryFailover: true, RecoveryPeriodBlockSeconds: 5, + // The default topo refresh time is 3 seconds. We are intentionally making it slower for the test, so that we have time to verify + // the /debug/health output before and after the first refresh runs. + TopologyRefreshSeconds: 10, }, 1, "") keyspace := &clusterInfo.ClusterInstance.Keyspaces[0] shard0 := &keyspace.Shards[0] vtorc := clusterInfo.ClusterInstance.VTOrcProcesses[0] + // Call API with retry to ensure VTOrc is up + status, resp := utils.MakeAPICallRetry(t, vtorc, "/debug/health", func(code int, response string) bool { + return code == 0 + }) + // When VTOrc is up and hasn't run the topo-refresh, is should be healthy but HasDiscovered should be false. + assert.Equal(t, 500, status) + assert.Contains(t, resp, `"Healthy": true,`) + assert.Contains(t, resp, `"DiscoveredOnce": false`) // find primary from topo primary := utils.ShardPrimaryTablet(t, clusterInfo, keyspace, shard0) @@ -63,14 +73,17 @@ func TestProblemsAPI(t *testing.T) { t.Run("Health API", func(t *testing.T) { // Check that VTOrc is healthy - status, resp := utils.MakeAPICall(t, vtorc, "/debug/health") + status, resp, err := utils.MakeAPICall(t, vtorc, "/debug/health") + require.NoError(t, err) assert.Equal(t, 200, status) assert.Contains(t, resp, `"Healthy": true,`) + assert.Contains(t, resp, `"DiscoveredOnce": true`) }) t.Run("Liveness API", func(t *testing.T) { // Check that VTOrc is live - status, resp := utils.MakeAPICall(t, vtorc, "/debug/liveness") + status, resp, err := utils.MakeAPICall(t, vtorc, "/debug/liveness") + require.NoError(t, err) assert.Equal(t, 200, status) assert.Empty(t, resp) }) @@ -82,7 +95,8 @@ func TestProblemsAPI(t *testing.T) { t.Run("Disable Recoveries API", func(t *testing.T) { // Disable recoveries of VTOrc - status, resp := utils.MakeAPICall(t, vtorc, "/api/disable-global-recoveries") + status, resp, err := utils.MakeAPICall(t, vtorc, "/api/disable-global-recoveries") + require.NoError(t, err) assert.Equal(t, 200, status) assert.Equal(t, "Global recoveries disabled\n", resp) }) @@ -103,29 +117,34 @@ func TestProblemsAPI(t *testing.T) { assert.Contains(t, resp, `"Analysis": "ReplicationStopped"`) // Verify that filtering also works in the API as intended - status, resp = utils.MakeAPICall(t, vtorc, "/api/replication-analysis?keyspace=ks&shard=0") + status, resp, err = utils.MakeAPICall(t, vtorc, "/api/replication-analysis?keyspace=ks&shard=0") + require.NoError(t, err) assert.Equal(t, 200, status, resp) assert.Contains(t, resp, fmt.Sprintf(`"Port": %d`, replica.MySQLPort)) // Verify that filtering by keyspace also works in the API as intended - status, resp = utils.MakeAPICall(t, vtorc, "/api/replication-analysis?keyspace=ks") + status, resp, err = utils.MakeAPICall(t, vtorc, "/api/replication-analysis?keyspace=ks") + require.NoError(t, err) assert.Equal(t, 200, status, resp) assert.Contains(t, resp, fmt.Sprintf(`"Port": %d`, replica.MySQLPort)) // Check that filtering using keyspace and shard works - status, resp = utils.MakeAPICall(t, vtorc, "/api/replication-analysis?keyspace=ks&shard=80-") + status, resp, err = utils.MakeAPICall(t, vtorc, "/api/replication-analysis?keyspace=ks&shard=80-") + require.NoError(t, err) assert.Equal(t, 200, status, resp) assert.Equal(t, "[]", resp) // Check that filtering using just the shard fails - status, resp = utils.MakeAPICall(t, vtorc, "/api/replication-analysis?shard=0") + status, resp, err = utils.MakeAPICall(t, vtorc, "/api/replication-analysis?shard=0") + require.NoError(t, err) assert.Equal(t, 400, status, resp) assert.Equal(t, "Filtering by shard without keyspace isn't supported\n", resp) }) t.Run("Enable Recoveries API", func(t *testing.T) { // Enable recoveries of VTOrc - status, resp := utils.MakeAPICall(t, vtorc, "/api/enable-global-recoveries") + status, resp, err := utils.MakeAPICall(t, vtorc, "/api/enable-global-recoveries") + require.NoError(t, err) assert.Equal(t, 200, status) assert.Equal(t, "Global recoveries enabled\n", resp) @@ -156,22 +175,26 @@ func TestProblemsAPI(t *testing.T) { assert.Contains(t, resp, fmt.Sprintf(`"InstanceAlias": "%v"`, replica.Alias)) // Check that filtering using keyspace and shard works - status, resp = utils.MakeAPICall(t, vtorc, "/api/problems?keyspace=ks&shard=0") + status, resp, err = utils.MakeAPICall(t, vtorc, "/api/problems?keyspace=ks&shard=0") + require.NoError(t, err) assert.Equal(t, 200, status, resp) assert.Contains(t, resp, fmt.Sprintf(`"InstanceAlias": "%v"`, replica.Alias)) // Check that filtering using keyspace works - status, resp = utils.MakeAPICall(t, vtorc, "/api/problems?keyspace=ks") + status, resp, err = utils.MakeAPICall(t, vtorc, "/api/problems?keyspace=ks") + require.NoError(t, err) assert.Equal(t, 200, status, resp) assert.Contains(t, resp, fmt.Sprintf(`"InstanceAlias": "%v"`, replica.Alias)) // Check that filtering using keyspace and shard works - status, resp = utils.MakeAPICall(t, vtorc, "/api/problems?keyspace=ks&shard=80-") + status, resp, err = utils.MakeAPICall(t, vtorc, "/api/problems?keyspace=ks&shard=80-") + require.NoError(t, err) assert.Equal(t, 200, status, resp) assert.Equal(t, "null", resp) // Check that filtering using just the shard fails - status, resp = utils.MakeAPICall(t, vtorc, "/api/problems?shard=0") + status, resp, err = utils.MakeAPICall(t, vtorc, "/api/problems?shard=0") + require.NoError(t, err) assert.Equal(t, 400, status, resp) assert.Equal(t, "Filtering by shard without keyspace isn't supported\n", resp) }) diff --git a/go/test/endtoend/vtorc/utils/utils.go b/go/test/endtoend/vtorc/utils/utils.go index fc118ef3ac5..e5d7664e491 100644 --- a/go/test/endtoend/vtorc/utils/utils.go +++ b/go/test/endtoend/vtorc/utils/utils.go @@ -737,12 +737,10 @@ func CheckSourcePort(t *testing.T, replica *cluster.Vttablet, source *cluster.Vt } // MakeAPICall is used make an API call given the url. It returns the status and the body of the response received -func MakeAPICall(t *testing.T, vtorc *cluster.VTOrcProcess, url string) (status int, response string) { +func MakeAPICall(t *testing.T, vtorc *cluster.VTOrcProcess, url string) (status int, response string, err error) { t.Helper() - var err error status, response, err = vtorc.MakeAPICall(url) - require.NoError(t, err) - return status, response + return status, response, err } // MakeAPICallRetry is used to make an API call and retry on the given condition. @@ -756,7 +754,7 @@ func MakeAPICallRetry(t *testing.T, vtorc *cluster.VTOrcProcess, url string, ret t.Fatal("timed out waiting for api to work") return default: - status, response = MakeAPICall(t, vtorc, url) + status, response, _ := MakeAPICall(t, vtorc, url) if retry(status, response) { time.Sleep(1 * time.Second) break diff --git a/go/vt/vtorc/logic/orchestrator.go b/go/vt/vtorc/logic/orchestrator.go index dcc30027392..4fd36c031bd 100644 --- a/go/vt/vtorc/logic/orchestrator.go +++ b/go/vt/vtorc/logic/orchestrator.go @@ -155,7 +155,6 @@ func waitForLocksRelease() { // instance discovery per entry. func handleDiscoveryRequests() { discoveryQueue = discovery.CreateOrReturnQueue("DEFAULT") - // create a pool of discovery workers for i := uint(0); i < config.DiscoveryMaxConcurrency; i++ { go func() { @@ -178,7 +177,7 @@ func handleDiscoveryRequests() { } // DiscoverInstance will attempt to discover (poll) an instance (unless -// it is already up to date) and will also ensure that its primary and +// it is already up-to-date) and will also ensure that its primary and // replicas (if any) are also checked. func DiscoverInstance(instanceKey inst.InstanceKey, forceDiscovery bool) { if inst.InstanceIsForgotten(&instanceKey) { @@ -272,7 +271,6 @@ func DiscoverInstance(instanceKey inst.InstanceKey, forceDiscovery bool) { // onHealthTick handles the actions to take to discover/poll instances func onHealthTick() { wasAlreadyElected := IsLeader() - { myIsElectedNode, err := process.AttemptElection() if err != nil { @@ -438,8 +436,27 @@ func ContinuousDiscovery() { } }() case <-tabletTopoTick: - go RefreshAllKeyspaces() - go refreshAllTablets() + // Create a wait group + var wg sync.WaitGroup + + // Refresh all keyspace information. + wg.Add(1) + go func() { + defer wg.Done() + RefreshAllKeyspaces() + }() + + // Refresh all tablets. + wg.Add(1) + go func() { + defer wg.Done() + refreshAllTablets() + }() + + // Wait for both the refreshes to complete + wg.Wait() + // We have completed one discovery cycle in the entirety of it. We should update the process health. + process.FirstDiscoveryCycleComplete.Store(true) } } } diff --git a/go/vt/vtorc/process/health.go b/go/vt/vtorc/process/health.go index 9f25fe51a39..22db89e1d56 100644 --- a/go/vt/vtorc/process/health.go +++ b/go/vt/vtorc/process/health.go @@ -32,6 +32,7 @@ import ( var lastHealthCheckUnixNano int64 var lastGoodHealthCheckUnixNano int64 var LastContinousCheckHealthy int64 +var FirstDiscoveryCycleComplete atomic.Bool var lastHealthCheckCache = cache.New(config.HealthPollSeconds*time.Second, time.Second) @@ -73,6 +74,7 @@ type HealthStatus struct { Hostname string Token string IsActiveNode bool + DiscoveredOnce bool ActiveNode *NodeHealth Error error AvailableNodes [](*NodeHealth) @@ -119,6 +121,7 @@ func HealthTest() (health *HealthStatus, err error) { return health, err } health.Healthy = healthy + health.DiscoveredOnce = FirstDiscoveryCycleComplete.Load() if health.ActiveNode, health.IsActiveNode, err = ElectedNode(); err != nil { health.Error = err diff --git a/go/vt/vtorc/server/api.go b/go/vt/vtorc/server/api.go index 1a7f0a1c1da..ade523486df 100644 --- a/go/vt/vtorc/server/api.go +++ b/go/vt/vtorc/server/api.go @@ -184,7 +184,8 @@ func healthAPIHandler(response http.ResponseWriter, request *http.Request) { return } code := http.StatusOK - if !health.Healthy { + // If the process isn't healthy, or if the first discovery cycle hasn't completed, we return an internal server error. + if !health.Healthy || !health.DiscoveredOnce { code = http.StatusInternalServerError } returnAsJSON(response, code, health)