From ce7d7fb8bd87d6a790ee6bc8cc170aecee11947a Mon Sep 17 00:00:00 2001 From: Rameez Sajwani Date: Thu, 20 Apr 2023 11:08:12 -0700 Subject: [PATCH 1/7] add new field to health status Signed-off-by: Rameez Sajwani --- go/test/endtoend/cluster/vtorc_process.go | 2 + go/test/endtoend/vtorc/api/api_test.go | 82 +++++++++++++++++++---- go/test/endtoend/vtorc/utils/utils.go | 9 ++- go/vt/vtorc/logic/orchestrator.go | 7 +- go/vt/vtorc/process/health.go | 3 + 5 files changed, 83 insertions(+), 20 deletions(-) diff --git a/go/test/endtoend/cluster/vtorc_process.go b/go/test/endtoend/cluster/vtorc_process.go index 882b716208a..469b8f6ff98 100644 --- a/go/test/endtoend/cluster/vtorc_process.go +++ b/go/test/endtoend/cluster/vtorc_process.go @@ -55,6 +55,7 @@ type VTOrcConfiguration struct { MySQLReplicaUser string MySQLReplicaPassword string RecoveryPeriodBlockSeconds int + TopoInformationRefreshSeconds int PreventCrossDataCenterPrimaryFailover bool `json:",omitempty"` LockShardTimeoutSeconds int `json:",omitempty"` ReplicationLagQuery string `json:",omitempty"` @@ -77,6 +78,7 @@ func (config *VTOrcConfiguration) AddDefaults(webPort int) { config.RecoveryPeriodBlockSeconds = 1 } config.ListenAddress = fmt.Sprintf(":%d", webPort) + config.TopoInformationRefreshSeconds = 3 } // Setup starts orc process with required arguements diff --git a/go/test/endtoend/vtorc/api/api_test.go b/go/test/endtoend/vtorc/api/api_test.go index 4885a67aa9c..560ee8004b0 100644 --- a/go/test/endtoend/vtorc/api/api_test.go +++ b/go/test/endtoend/vtorc/api/api_test.go @@ -18,14 +18,17 @@ package api import ( "fmt" + "strings" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "vitess.io/vitess/go/json2" "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/test/endtoend/vtorc/utils" + "vitess.io/vitess/go/vt/vtorc/process" ) // make an api call to /api/problems endpoint @@ -63,14 +66,17 @@ func TestProblemsAPI(t *testing.T) { t.Run("Health API", func(t *testing.T) { // Check that VTOrc is healthy - status, resp := utils.MakeAPICall(t, vtorc, "/debug/health") + status, resp, err := utils.MakeAPICall(t, vtorc, "/debug/health") + require.NoError(t, err) assert.Equal(t, 200, status) + require.NoError(t, err) assert.Contains(t, resp, `"Healthy": true,`) }) t.Run("Liveness API", func(t *testing.T) { // Check that VTOrc is live - status, resp := utils.MakeAPICall(t, vtorc, "/debug/liveness") + status, resp, err := utils.MakeAPICall(t, vtorc, "/debug/liveness") + require.NoError(t, err) assert.Equal(t, 200, status) assert.Empty(t, resp) }) @@ -82,7 +88,8 @@ func TestProblemsAPI(t *testing.T) { t.Run("Disable Recoveries API", func(t *testing.T) { // Disable recoveries of VTOrc - status, resp := utils.MakeAPICall(t, vtorc, "/api/disable-global-recoveries") + status, resp, err := utils.MakeAPICall(t, vtorc, "/api/disable-global-recoveries") + require.NoError(t, err) assert.Equal(t, 200, status) assert.Equal(t, "Global recoveries disabled\n", resp) }) @@ -103,29 +110,34 @@ func TestProblemsAPI(t *testing.T) { assert.Contains(t, resp, `"Analysis": "ReplicationStopped"`) // Verify that filtering also works in the API as intended - status, resp = utils.MakeAPICall(t, vtorc, "/api/replication-analysis?keyspace=ks&shard=0") + status, resp, err = utils.MakeAPICall(t, vtorc, "/api/replication-analysis?keyspace=ks&shard=0") + require.NoError(t, err) assert.Equal(t, 200, status, resp) assert.Contains(t, resp, fmt.Sprintf(`"Port": %d`, replica.MySQLPort)) // Verify that filtering by keyspace also works in the API as intended - status, resp = utils.MakeAPICall(t, vtorc, "/api/replication-analysis?keyspace=ks") + status, resp, err = utils.MakeAPICall(t, vtorc, "/api/replication-analysis?keyspace=ks") + require.NoError(t, err) assert.Equal(t, 200, status, resp) assert.Contains(t, resp, fmt.Sprintf(`"Port": %d`, replica.MySQLPort)) // Check that filtering using keyspace and shard works - status, resp = utils.MakeAPICall(t, vtorc, "/api/replication-analysis?keyspace=ks&shard=80-") + status, resp, err = utils.MakeAPICall(t, vtorc, "/api/replication-analysis?keyspace=ks&shard=80-") + require.NoError(t, err) assert.Equal(t, 200, status, resp) assert.Equal(t, "[]", resp) // Check that filtering using just the shard fails - status, resp = utils.MakeAPICall(t, vtorc, "/api/replication-analysis?shard=0") + status, resp, err = utils.MakeAPICall(t, vtorc, "/api/replication-analysis?shard=0") + require.NoError(t, err) assert.Equal(t, 400, status, resp) assert.Equal(t, "Filtering by shard without keyspace isn't supported\n", resp) }) t.Run("Enable Recoveries API", func(t *testing.T) { // Enable recoveries of VTOrc - status, resp := utils.MakeAPICall(t, vtorc, "/api/enable-global-recoveries") + status, resp, err := utils.MakeAPICall(t, vtorc, "/api/enable-global-recoveries") + require.NoError(t, err) assert.Equal(t, 200, status) assert.Equal(t, "Global recoveries enabled\n", resp) @@ -156,23 +168,69 @@ func TestProblemsAPI(t *testing.T) { assert.Contains(t, resp, fmt.Sprintf(`"InstanceAlias": "%v"`, replica.Alias)) // Check that filtering using keyspace and shard works - status, resp = utils.MakeAPICall(t, vtorc, "/api/problems?keyspace=ks&shard=0") + status, resp, err = utils.MakeAPICall(t, vtorc, "/api/problems?keyspace=ks&shard=0") + require.NoError(t, err) assert.Equal(t, 200, status, resp) assert.Contains(t, resp, fmt.Sprintf(`"InstanceAlias": "%v"`, replica.Alias)) // Check that filtering using keyspace works - status, resp = utils.MakeAPICall(t, vtorc, "/api/problems?keyspace=ks") + status, resp, err = utils.MakeAPICall(t, vtorc, "/api/problems?keyspace=ks") + require.NoError(t, err) assert.Equal(t, 200, status, resp) assert.Contains(t, resp, fmt.Sprintf(`"InstanceAlias": "%v"`, replica.Alias)) // Check that filtering using keyspace and shard works - status, resp = utils.MakeAPICall(t, vtorc, "/api/problems?keyspace=ks&shard=80-") + status, resp, err = utils.MakeAPICall(t, vtorc, "/api/problems?keyspace=ks&shard=80-") + require.NoError(t, err) assert.Equal(t, 200, status, resp) assert.Equal(t, "null", resp) // Check that filtering using just the shard fails - status, resp = utils.MakeAPICall(t, vtorc, "/api/problems?shard=0") + status, resp, err = utils.MakeAPICall(t, vtorc, "/api/problems?shard=0") + require.NoError(t, err) assert.Equal(t, 400, status, resp) assert.Equal(t, "Filtering by shard without keyspace isn't supported\n", resp) }) } + +// TestIfDiscoveringHappenedLaterInHealthCheck checks that `IsDiscovering` flag in `HealthStatus` turns `true`, sometime +// after flag `Healthy` turns `true`. +func TestIfDiscoveringHappenedLaterInHealthCheck(t *testing.T) { + defer cluster.PanicHandler(t) + utils.SetupVttabletsAndVTOrcs(t, clusterInfo, 2, 1, nil, cluster.VTOrcConfiguration{ + PreventCrossDataCenterPrimaryFailover: true, + RecoveryPeriodBlockSeconds: 5, + }, 1, "") + vtorc := clusterInfo.ClusterInstance.VTOrcProcesses[0] + // Call API with retry to ensure health service is up + status, resp := utils.MakeAPICallRetry(t, vtorc, "/debug/health", func(code int, response string) bool { + return code == 0 && (strings.Contains(response, "connection refused") || strings.Contains(response, "")) + }) + assert.Equal(t, 200, status) + // Since TopoInformationRefreshSeconds is default to 3s for this test. This will ensure there will ~3 seconds + // delay between `Healthy` and `IsDiscovering` to turn `true` + assert.Contains(t, resp, `"Healthy": true,`) + assert.Contains(t, resp, `"IsDiscovering": false`) + startTime := time.Now() + for { + // Check that VTOrc is healthy + status, resp, err := utils.MakeAPICall(t, vtorc, "/debug/health") + require.NoError(t, err) + assert.Equal(t, 200, status) + var healthStatus process.HealthStatus + err = json2.Unmarshal([]byte(resp), &healthStatus) + assert.Nil(t, err) + assert.Equal(t, healthStatus.Healthy, true) + if healthStatus.IsDiscovering { + timeForIsRecovering := time.Now() + require.Greater(t, timeForIsRecovering, startTime) + return + } + if time.Since(startTime) > 10*time.Second { + // time out + t.Fatal("timed out waiting for `timeForIsRecovering` to turned `true`") + return + } + time.Sleep(1 * time.Second) + } +} diff --git a/go/test/endtoend/vtorc/utils/utils.go b/go/test/endtoend/vtorc/utils/utils.go index fc118ef3ac5..106e2c37e9e 100644 --- a/go/test/endtoend/vtorc/utils/utils.go +++ b/go/test/endtoend/vtorc/utils/utils.go @@ -737,12 +737,10 @@ func CheckSourcePort(t *testing.T, replica *cluster.Vttablet, source *cluster.Vt } // MakeAPICall is used make an API call given the url. It returns the status and the body of the response received -func MakeAPICall(t *testing.T, vtorc *cluster.VTOrcProcess, url string) (status int, response string) { +func MakeAPICall(t *testing.T, vtorc *cluster.VTOrcProcess, url string) (status int, response string, err error) { t.Helper() - var err error status, response, err = vtorc.MakeAPICall(url) - require.NoError(t, err) - return status, response + return status, response, err } // MakeAPICallRetry is used to make an API call and retry on the given condition. @@ -756,7 +754,8 @@ func MakeAPICallRetry(t *testing.T, vtorc *cluster.VTOrcProcess, url string, ret t.Fatal("timed out waiting for api to work") return default: - status, response = MakeAPICall(t, vtorc, url) + status, response, err := MakeAPICall(t, vtorc, url) + require.NoError(t, err) if retry(status, response) { time.Sleep(1 * time.Second) break diff --git a/go/vt/vtorc/logic/orchestrator.go b/go/vt/vtorc/logic/orchestrator.go index dcc30027392..a363bfa7927 100644 --- a/go/vt/vtorc/logic/orchestrator.go +++ b/go/vt/vtorc/logic/orchestrator.go @@ -155,7 +155,6 @@ func waitForLocksRelease() { // instance discovery per entry. func handleDiscoveryRequests() { discoveryQueue = discovery.CreateOrReturnQueue("DEFAULT") - // create a pool of discovery workers for i := uint(0); i < config.DiscoveryMaxConcurrency; i++ { go func() { @@ -178,7 +177,7 @@ func handleDiscoveryRequests() { } // DiscoverInstance will attempt to discover (poll) an instance (unless -// it is already up to date) and will also ensure that its primary and +// it is already up-to-date) and will also ensure that its primary and // replicas (if any) are also checked. func DiscoverInstance(instanceKey inst.InstanceKey, forceDiscovery bool) { if inst.InstanceIsForgotten(&instanceKey) { @@ -267,12 +266,14 @@ func DiscoverInstance(instanceKey inst.InstanceKey, forceDiscovery bool) { InstanceLatency: instanceLatency, Err: nil, }) + // we turn on HitAtLeastOneDiscovery first time + // con: this will result in extra memory hit for every recovery cycle. + process.HitAtLeastOneDiscovery.CompareAndSwap(false, true) } // onHealthTick handles the actions to take to discover/poll instances func onHealthTick() { wasAlreadyElected := IsLeader() - { myIsElectedNode, err := process.AttemptElection() if err != nil { diff --git a/go/vt/vtorc/process/health.go b/go/vt/vtorc/process/health.go index 9f25fe51a39..494cfcf7d2d 100644 --- a/go/vt/vtorc/process/health.go +++ b/go/vt/vtorc/process/health.go @@ -32,6 +32,7 @@ import ( var lastHealthCheckUnixNano int64 var lastGoodHealthCheckUnixNano int64 var LastContinousCheckHealthy int64 +var HitAtLeastOneDiscovery atomic.Bool var lastHealthCheckCache = cache.New(config.HealthPollSeconds*time.Second, time.Second) @@ -73,6 +74,7 @@ type HealthStatus struct { Hostname string Token string IsActiveNode bool + IsDiscovering bool ActiveNode *NodeHealth Error error AvailableNodes [](*NodeHealth) @@ -119,6 +121,7 @@ func HealthTest() (health *HealthStatus, err error) { return health, err } health.Healthy = healthy + health.IsDiscovering = HitAtLeastOneDiscovery.Load() if health.ActiveNode, health.IsActiveNode, err = ElectedNode(); err != nil { health.Error = err From 34764461412df3249eac2b20f3591c9fbe9fc13c Mon Sep 17 00:00:00 2001 From: Rameez Sajwani Date: Thu, 20 Apr 2023 12:36:06 -0700 Subject: [PATCH 2/7] fix test bug Signed-off-by: Rameez Sajwani --- go/test/endtoend/cluster/vtorc_process.go | 1 - go/test/endtoend/vtorc/api/api_test.go | 1 + go/test/endtoend/vtorc/utils/utils.go | 3 +-- 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/go/test/endtoend/cluster/vtorc_process.go b/go/test/endtoend/cluster/vtorc_process.go index 469b8f6ff98..9c47c1a1633 100644 --- a/go/test/endtoend/cluster/vtorc_process.go +++ b/go/test/endtoend/cluster/vtorc_process.go @@ -78,7 +78,6 @@ func (config *VTOrcConfiguration) AddDefaults(webPort int) { config.RecoveryPeriodBlockSeconds = 1 } config.ListenAddress = fmt.Sprintf(":%d", webPort) - config.TopoInformationRefreshSeconds = 3 } // Setup starts orc process with required arguements diff --git a/go/test/endtoend/vtorc/api/api_test.go b/go/test/endtoend/vtorc/api/api_test.go index 560ee8004b0..bbd6e612d04 100644 --- a/go/test/endtoend/vtorc/api/api_test.go +++ b/go/test/endtoend/vtorc/api/api_test.go @@ -200,6 +200,7 @@ func TestIfDiscoveringHappenedLaterInHealthCheck(t *testing.T) { utils.SetupVttabletsAndVTOrcs(t, clusterInfo, 2, 1, nil, cluster.VTOrcConfiguration{ PreventCrossDataCenterPrimaryFailover: true, RecoveryPeriodBlockSeconds: 5, + TopoInformationRefreshSeconds: 5, }, 1, "") vtorc := clusterInfo.ClusterInstance.VTOrcProcesses[0] // Call API with retry to ensure health service is up diff --git a/go/test/endtoend/vtorc/utils/utils.go b/go/test/endtoend/vtorc/utils/utils.go index 106e2c37e9e..e5d7664e491 100644 --- a/go/test/endtoend/vtorc/utils/utils.go +++ b/go/test/endtoend/vtorc/utils/utils.go @@ -754,8 +754,7 @@ func MakeAPICallRetry(t *testing.T, vtorc *cluster.VTOrcProcess, url string, ret t.Fatal("timed out waiting for api to work") return default: - status, response, err := MakeAPICall(t, vtorc, url) - require.NoError(t, err) + status, response, _ := MakeAPICall(t, vtorc, url) if retry(status, response) { time.Sleep(1 * time.Second) break From d3dea5654cf493f4245957cded4bf05c972247a5 Mon Sep 17 00:00:00 2001 From: Rameez Sajwani Date: Fri, 21 Apr 2023 12:11:01 -0700 Subject: [PATCH 3/7] bug fix Signed-off-by: Rameez Sajwani --- go/test/endtoend/cluster/vtorc_process.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/go/test/endtoend/cluster/vtorc_process.go b/go/test/endtoend/cluster/vtorc_process.go index 9c47c1a1633..7e31a868fb1 100644 --- a/go/test/endtoend/cluster/vtorc_process.go +++ b/go/test/endtoend/cluster/vtorc_process.go @@ -77,6 +77,9 @@ func (config *VTOrcConfiguration) AddDefaults(webPort int) { if config.RecoveryPeriodBlockSeconds == 0 { config.RecoveryPeriodBlockSeconds = 1 } + if config.TopoInformationRefreshSeconds == 0 { + config.TopoInformationRefreshSeconds = 1 + } config.ListenAddress = fmt.Sprintf(":%d", webPort) } From b679c0363662d3c4a36fd5660b06715251bf82fd Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Wed, 3 May 2023 15:40:43 +0530 Subject: [PATCH 4/7] feat: fix the issues pointed out in reviews Signed-off-by: Manan Gupta --- go/test/endtoend/cluster/vtorc_process.go | 5 +---- go/test/endtoend/vtorc/api/api_test.go | 3 +-- go/vt/vtorc/logic/orchestrator.go | 26 ++++++++++++++++++----- go/vt/vtorc/process/health.go | 6 +++--- go/vt/vtorc/server/api.go | 3 ++- 5 files changed, 28 insertions(+), 15 deletions(-) diff --git a/go/test/endtoend/cluster/vtorc_process.go b/go/test/endtoend/cluster/vtorc_process.go index 7e31a868fb1..70bfbf90f1f 100644 --- a/go/test/endtoend/cluster/vtorc_process.go +++ b/go/test/endtoend/cluster/vtorc_process.go @@ -55,7 +55,7 @@ type VTOrcConfiguration struct { MySQLReplicaUser string MySQLReplicaPassword string RecoveryPeriodBlockSeconds int - TopoInformationRefreshSeconds int + TopoInformationRefreshSeconds int `json:",omitempty"` PreventCrossDataCenterPrimaryFailover bool `json:",omitempty"` LockShardTimeoutSeconds int `json:",omitempty"` ReplicationLagQuery string `json:",omitempty"` @@ -77,9 +77,6 @@ func (config *VTOrcConfiguration) AddDefaults(webPort int) { if config.RecoveryPeriodBlockSeconds == 0 { config.RecoveryPeriodBlockSeconds = 1 } - if config.TopoInformationRefreshSeconds == 0 { - config.TopoInformationRefreshSeconds = 1 - } config.ListenAddress = fmt.Sprintf(":%d", webPort) } diff --git a/go/test/endtoend/vtorc/api/api_test.go b/go/test/endtoend/vtorc/api/api_test.go index bbd6e612d04..6f8f3c07171 100644 --- a/go/test/endtoend/vtorc/api/api_test.go +++ b/go/test/endtoend/vtorc/api/api_test.go @@ -69,7 +69,6 @@ func TestProblemsAPI(t *testing.T) { status, resp, err := utils.MakeAPICall(t, vtorc, "/debug/health") require.NoError(t, err) assert.Equal(t, 200, status) - require.NoError(t, err) assert.Contains(t, resp, `"Healthy": true,`) }) @@ -222,7 +221,7 @@ func TestIfDiscoveringHappenedLaterInHealthCheck(t *testing.T) { err = json2.Unmarshal([]byte(resp), &healthStatus) assert.Nil(t, err) assert.Equal(t, healthStatus.Healthy, true) - if healthStatus.IsDiscovering { + if healthStatus.HasDiscoveredOnce { timeForIsRecovering := time.Now() require.Greater(t, timeForIsRecovering, startTime) return diff --git a/go/vt/vtorc/logic/orchestrator.go b/go/vt/vtorc/logic/orchestrator.go index a363bfa7927..4fd36c031bd 100644 --- a/go/vt/vtorc/logic/orchestrator.go +++ b/go/vt/vtorc/logic/orchestrator.go @@ -266,9 +266,6 @@ func DiscoverInstance(instanceKey inst.InstanceKey, forceDiscovery bool) { InstanceLatency: instanceLatency, Err: nil, }) - // we turn on HitAtLeastOneDiscovery first time - // con: this will result in extra memory hit for every recovery cycle. - process.HitAtLeastOneDiscovery.CompareAndSwap(false, true) } // onHealthTick handles the actions to take to discover/poll instances @@ -439,8 +436,27 @@ func ContinuousDiscovery() { } }() case <-tabletTopoTick: - go RefreshAllKeyspaces() - go refreshAllTablets() + // Create a wait group + var wg sync.WaitGroup + + // Refresh all keyspace information. + wg.Add(1) + go func() { + defer wg.Done() + RefreshAllKeyspaces() + }() + + // Refresh all tablets. + wg.Add(1) + go func() { + defer wg.Done() + refreshAllTablets() + }() + + // Wait for both the refreshes to complete + wg.Wait() + // We have completed one discovery cycle in the entirety of it. We should update the process health. + process.FirstDiscoveryCycleComplete.Store(true) } } } diff --git a/go/vt/vtorc/process/health.go b/go/vt/vtorc/process/health.go index 494cfcf7d2d..145e9ea747f 100644 --- a/go/vt/vtorc/process/health.go +++ b/go/vt/vtorc/process/health.go @@ -32,7 +32,7 @@ import ( var lastHealthCheckUnixNano int64 var lastGoodHealthCheckUnixNano int64 var LastContinousCheckHealthy int64 -var HitAtLeastOneDiscovery atomic.Bool +var FirstDiscoveryCycleComplete atomic.Bool var lastHealthCheckCache = cache.New(config.HealthPollSeconds*time.Second, time.Second) @@ -74,7 +74,7 @@ type HealthStatus struct { Hostname string Token string IsActiveNode bool - IsDiscovering bool + HasDiscoveredOnce bool ActiveNode *NodeHealth Error error AvailableNodes [](*NodeHealth) @@ -121,7 +121,7 @@ func HealthTest() (health *HealthStatus, err error) { return health, err } health.Healthy = healthy - health.IsDiscovering = HitAtLeastOneDiscovery.Load() + health.HasDiscoveredOnce = FirstDiscoveryCycleComplete.Load() if health.ActiveNode, health.IsActiveNode, err = ElectedNode(); err != nil { health.Error = err diff --git a/go/vt/vtorc/server/api.go b/go/vt/vtorc/server/api.go index 1a7f0a1c1da..c862e3f5653 100644 --- a/go/vt/vtorc/server/api.go +++ b/go/vt/vtorc/server/api.go @@ -184,7 +184,8 @@ func healthAPIHandler(response http.ResponseWriter, request *http.Request) { return } code := http.StatusOK - if !health.Healthy { + // If the process isn't healthy, or if the first discovery cycle hasn't completed, we return an internal server error. + if !health.Healthy || !health.HasDiscoveredOnce { code = http.StatusInternalServerError } returnAsJSON(response, code, health) From 7b096148c498accc021cc203f9aa6d9fdec7e2f7 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Wed, 3 May 2023 16:06:24 +0530 Subject: [PATCH 5/7] feat: fix the test Signed-off-by: Manan Gupta --- go/test/endtoend/vtorc/api/api_test.go | 63 ++++++-------------------- 1 file changed, 14 insertions(+), 49 deletions(-) diff --git a/go/test/endtoend/vtorc/api/api_test.go b/go/test/endtoend/vtorc/api/api_test.go index 6f8f3c07171..886149426c1 100644 --- a/go/test/endtoend/vtorc/api/api_test.go +++ b/go/test/endtoend/vtorc/api/api_test.go @@ -18,30 +18,37 @@ package api import ( "fmt" - "strings" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "vitess.io/vitess/go/json2" "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/test/endtoend/vtorc/utils" - "vitess.io/vitess/go/vt/vtorc/process" ) -// make an api call to /api/problems endpoint -// and verify the output -func TestProblemsAPI(t *testing.T) { +// TestAPIEndpoints tests the various API endpoints that VTOrc offers. +func TestAPIEndpoints(t *testing.T) { defer cluster.PanicHandler(t) utils.SetupVttabletsAndVTOrcs(t, clusterInfo, 2, 1, nil, cluster.VTOrcConfiguration{ PreventCrossDataCenterPrimaryFailover: true, RecoveryPeriodBlockSeconds: 5, + // The default topo refresh time is 3 seconds. We are intentionally making it slower for the test, so that we have time to verify + // the /debug/health output before and after the first refresh runs. + TopoInformationRefreshSeconds: 10, }, 1, "") keyspace := &clusterInfo.ClusterInstance.Keyspaces[0] shard0 := &keyspace.Shards[0] vtorc := clusterInfo.ClusterInstance.VTOrcProcesses[0] + // Call API with retry to ensure VTOrc is up + status, resp := utils.MakeAPICallRetry(t, vtorc, "/debug/health", func(code int, response string) bool { + return code == 0 + }) + // When VTOrc is up and hasn't run the topo-refresh, is should be healthy but HasDiscovered should be false. + assert.Equal(t, 500, status) + assert.Contains(t, resp, `"Healthy": true,`) + assert.Contains(t, resp, `"HasDiscoveredOnce": false`) // find primary from topo primary := utils.ShardPrimaryTablet(t, clusterInfo, keyspace, shard0) @@ -70,6 +77,7 @@ func TestProblemsAPI(t *testing.T) { require.NoError(t, err) assert.Equal(t, 200, status) assert.Contains(t, resp, `"Healthy": true,`) + assert.Contains(t, resp, `"HasDiscoveredOnce": true`) }) t.Run("Liveness API", func(t *testing.T) { @@ -191,46 +199,3 @@ func TestProblemsAPI(t *testing.T) { assert.Equal(t, "Filtering by shard without keyspace isn't supported\n", resp) }) } - -// TestIfDiscoveringHappenedLaterInHealthCheck checks that `IsDiscovering` flag in `HealthStatus` turns `true`, sometime -// after flag `Healthy` turns `true`. -func TestIfDiscoveringHappenedLaterInHealthCheck(t *testing.T) { - defer cluster.PanicHandler(t) - utils.SetupVttabletsAndVTOrcs(t, clusterInfo, 2, 1, nil, cluster.VTOrcConfiguration{ - PreventCrossDataCenterPrimaryFailover: true, - RecoveryPeriodBlockSeconds: 5, - TopoInformationRefreshSeconds: 5, - }, 1, "") - vtorc := clusterInfo.ClusterInstance.VTOrcProcesses[0] - // Call API with retry to ensure health service is up - status, resp := utils.MakeAPICallRetry(t, vtorc, "/debug/health", func(code int, response string) bool { - return code == 0 && (strings.Contains(response, "connection refused") || strings.Contains(response, "")) - }) - assert.Equal(t, 200, status) - // Since TopoInformationRefreshSeconds is default to 3s for this test. This will ensure there will ~3 seconds - // delay between `Healthy` and `IsDiscovering` to turn `true` - assert.Contains(t, resp, `"Healthy": true,`) - assert.Contains(t, resp, `"IsDiscovering": false`) - startTime := time.Now() - for { - // Check that VTOrc is healthy - status, resp, err := utils.MakeAPICall(t, vtorc, "/debug/health") - require.NoError(t, err) - assert.Equal(t, 200, status) - var healthStatus process.HealthStatus - err = json2.Unmarshal([]byte(resp), &healthStatus) - assert.Nil(t, err) - assert.Equal(t, healthStatus.Healthy, true) - if healthStatus.HasDiscoveredOnce { - timeForIsRecovering := time.Now() - require.Greater(t, timeForIsRecovering, startTime) - return - } - if time.Since(startTime) > 10*time.Second { - // time out - t.Fatal("timed out waiting for `timeForIsRecovering` to turned `true`") - return - } - time.Sleep(1 * time.Second) - } -} From 1012b6b64a2ff39985c850d53a2430324c6f16e2 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Mon, 8 May 2023 11:43:34 +0530 Subject: [PATCH 6/7] feat: fix naming of variables Signed-off-by: Manan Gupta --- go/test/endtoend/cluster/vtorc_process.go | 2 +- go/test/endtoend/vtorc/api/api_test.go | 2 +- go/vt/vtorc/process/health.go | 4 ++-- go/vt/vtorc/server/api.go | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/go/test/endtoend/cluster/vtorc_process.go b/go/test/endtoend/cluster/vtorc_process.go index 70bfbf90f1f..38f259d8945 100644 --- a/go/test/endtoend/cluster/vtorc_process.go +++ b/go/test/endtoend/cluster/vtorc_process.go @@ -55,7 +55,7 @@ type VTOrcConfiguration struct { MySQLReplicaUser string MySQLReplicaPassword string RecoveryPeriodBlockSeconds int - TopoInformationRefreshSeconds int `json:",omitempty"` + TopologyRefreshSeconds int `json:",omitempty"` PreventCrossDataCenterPrimaryFailover bool `json:",omitempty"` LockShardTimeoutSeconds int `json:",omitempty"` ReplicationLagQuery string `json:",omitempty"` diff --git a/go/test/endtoend/vtorc/api/api_test.go b/go/test/endtoend/vtorc/api/api_test.go index 886149426c1..352d870d827 100644 --- a/go/test/endtoend/vtorc/api/api_test.go +++ b/go/test/endtoend/vtorc/api/api_test.go @@ -36,7 +36,7 @@ func TestAPIEndpoints(t *testing.T) { RecoveryPeriodBlockSeconds: 5, // The default topo refresh time is 3 seconds. We are intentionally making it slower for the test, so that we have time to verify // the /debug/health output before and after the first refresh runs. - TopoInformationRefreshSeconds: 10, + TopologyRefreshSeconds: 10, }, 1, "") keyspace := &clusterInfo.ClusterInstance.Keyspaces[0] shard0 := &keyspace.Shards[0] diff --git a/go/vt/vtorc/process/health.go b/go/vt/vtorc/process/health.go index 145e9ea747f..22db89e1d56 100644 --- a/go/vt/vtorc/process/health.go +++ b/go/vt/vtorc/process/health.go @@ -74,7 +74,7 @@ type HealthStatus struct { Hostname string Token string IsActiveNode bool - HasDiscoveredOnce bool + DiscoveredOnce bool ActiveNode *NodeHealth Error error AvailableNodes [](*NodeHealth) @@ -121,7 +121,7 @@ func HealthTest() (health *HealthStatus, err error) { return health, err } health.Healthy = healthy - health.HasDiscoveredOnce = FirstDiscoveryCycleComplete.Load() + health.DiscoveredOnce = FirstDiscoveryCycleComplete.Load() if health.ActiveNode, health.IsActiveNode, err = ElectedNode(); err != nil { health.Error = err diff --git a/go/vt/vtorc/server/api.go b/go/vt/vtorc/server/api.go index c862e3f5653..ade523486df 100644 --- a/go/vt/vtorc/server/api.go +++ b/go/vt/vtorc/server/api.go @@ -185,7 +185,7 @@ func healthAPIHandler(response http.ResponseWriter, request *http.Request) { } code := http.StatusOK // If the process isn't healthy, or if the first discovery cycle hasn't completed, we return an internal server error. - if !health.Healthy || !health.HasDiscoveredOnce { + if !health.Healthy || !health.DiscoveredOnce { code = http.StatusInternalServerError } returnAsJSON(response, code, health) From cfbe2680b8b31c9abd2f51945d79b2542a7051d1 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Mon, 8 May 2023 14:56:55 +0530 Subject: [PATCH 7/7] feat: fix tests after the changes Signed-off-by: Manan Gupta --- go/test/endtoend/vtorc/api/api_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/test/endtoend/vtorc/api/api_test.go b/go/test/endtoend/vtorc/api/api_test.go index 352d870d827..3af5725d3e9 100644 --- a/go/test/endtoend/vtorc/api/api_test.go +++ b/go/test/endtoend/vtorc/api/api_test.go @@ -48,7 +48,7 @@ func TestAPIEndpoints(t *testing.T) { // When VTOrc is up and hasn't run the topo-refresh, is should be healthy but HasDiscovered should be false. assert.Equal(t, 500, status) assert.Contains(t, resp, `"Healthy": true,`) - assert.Contains(t, resp, `"HasDiscoveredOnce": false`) + assert.Contains(t, resp, `"DiscoveredOnce": false`) // find primary from topo primary := utils.ShardPrimaryTablet(t, clusterInfo, keyspace, shard0) @@ -77,7 +77,7 @@ func TestAPIEndpoints(t *testing.T) { require.NoError(t, err) assert.Equal(t, 200, status) assert.Contains(t, resp, `"Healthy": true,`) - assert.Contains(t, resp, `"HasDiscoveredOnce": true`) + assert.Contains(t, resp, `"DiscoveredOnce": true`) }) t.Run("Liveness API", func(t *testing.T) {