From bb402cdd46bd20f4087dda95fe8da2fac8a8700d Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Tue, 18 Oct 2022 13:07:09 +0530 Subject: [PATCH 1/2] feat: add gauge for checkMySQL running Signed-off-by: Manan Gupta --- go/vt/vttablet/tabletserver/state_manager.go | 17 +++++++++-- .../tabletserver/state_manager_test.go | 26 +++++++++++++---- go/vt/vttablet/tabletserver/tabletserver.go | 6 +++- .../tabletserver/tabletserver_test.go | 28 +++++++++++++++++++ 4 files changed, 68 insertions(+), 9 deletions(-) diff --git a/go/vt/vttablet/tabletserver/state_manager.go b/go/vt/vttablet/tabletserver/state_manager.go index 341bc12d1fd..1257e501b3f 100644 --- a/go/vt/vttablet/tabletserver/state_manager.go +++ b/go/vt/vttablet/tabletserver/state_manager.go @@ -122,6 +122,7 @@ type stateManager struct { // checkMySQLThrottler ensures that CheckMysql // doesn't get spammed. checkMySQLThrottler *sync2.Semaphore + checkMySQLRunning bool timebombDuration time.Duration unhealthyThreshold sync2.AtomicDuration @@ -301,17 +302,21 @@ func (sm *stateManager) recheckState() bool { return false } -// CheckMySQL verifies that we can connect to mysql. +// checkMySQL verifies that we can connect to mysql. // If it fails, then we shutdown the service and initiate // the retry loop. -func (sm *stateManager) CheckMySQL() { +func (sm *stateManager) checkMySQL() { if !sm.checkMySQLThrottler.TryAcquire() { return } + log.Infof("CheckMySQL started") + sm.checkMySQLRunning = true go func() { defer func() { time.Sleep(1 * time.Second) + sm.checkMySQLRunning = false sm.checkMySQLThrottler.Release() + log.Infof("CheckMySQL finished") }() err := sm.qe.IsMySQLReachable() @@ -330,6 +335,14 @@ func (sm *stateManager) CheckMySQL() { }() } +// isCheckMySQLRunning returns 1 if CheckMySQL function is in progress +func (sm *stateManager) isCheckMySQLRunning() int64 { + if sm.checkMySQLRunning { + return 1 + } + return 0 +} + // StopService shuts down sm. If the shutdown doesn't complete // within timeBombDuration, it crashes the process. func (sm *stateManager) StopService() { diff --git a/go/vt/vttablet/tabletserver/state_manager_test.go b/go/vt/vttablet/tabletserver/state_manager_test.go index 4953a3affe4..c836d60f22e 100644 --- a/go/vt/vttablet/tabletserver/state_manager_test.go +++ b/go/vt/vttablet/tabletserver/state_manager_test.go @@ -17,6 +17,7 @@ limitations under the License. package tabletserver import ( + "context" "errors" "sync" "testing" @@ -24,13 +25,11 @@ import ( "google.golang.org/protobuf/proto" - "vitess.io/vitess/go/mysql/fakesqldb" - - "context" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "vitess.io/vitess/go/mysql/fakesqldb" + "vitess.io/vitess/go/sync2" "vitess.io/vitess/go/vt/log" querypb "vitess.io/vitess/go/vt/proto/query" @@ -459,10 +458,11 @@ func TestStateManagerCheckMySQL(t *testing.T) { sm.qe.(*testQueryEngine).failMySQL = true order.Set(0) - sm.CheckMySQL() + sm.checkMySQL() + assert.EqualValues(t, 1, sm.isCheckMySQLRunning()) // Rechecking immediately should be a no-op: - sm.CheckMySQL() + sm.checkMySQL() // Wait for closeAll to get under way. for { @@ -493,6 +493,20 @@ func TestStateManagerCheckMySQL(t *testing.T) { assert.Equal(t, topodatapb.TabletType_PRIMARY, sm.Target().TabletType) assert.Equal(t, StateServing, sm.State()) + + // Wait for checkMySQL to finish. + timeout := time.After(2 * time.Second) + for { + select { + case <-timeout: + t.Fatalf("Timedout waiting for checkMySQL to finish") + default: + if sm.isCheckMySQLRunning() == 0 { + return + } + time.Sleep(100 * time.Millisecond) + } + } } func TestStateManagerValidations(t *testing.T) { diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index c8959d5db7e..2f76020a3b6 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -125,6 +125,9 @@ type TabletServer struct { // alias is used for identifying this tabletserver in healthcheck responses. alias *topodatapb.TabletAlias + + // This field is only stored for testing + checkMysqlGaugeFunc *stats.GaugeFunc } var _ queryservice.QueryService = (*TabletServer)(nil) @@ -206,6 +209,7 @@ func NewTabletServer(name string, config *tabletenv.TabletConfig, topoServer *to } tsv.exporter.NewGaugeFunc("TabletState", "Tablet server state", func() int64 { return int64(tsv.sm.State()) }) + tsv.checkMysqlGaugeFunc = tsv.exporter.NewGaugeFunc("CheckMySQLRunning", "Check MySQL operation currently in progress", tsv.sm.isCheckMySQLRunning) tsv.exporter.Publish("TabletStateName", stats.StringFunc(tsv.sm.IsServingString)) // TabletServerState exports the same information as the above two stats (TabletState / TabletStateName), @@ -1671,7 +1675,7 @@ func (tsv *TabletServer) IsServing() bool { // to no more than once per second. // The function satisfies tabletenv.Env. func (tsv *TabletServer) CheckMySQL() { - tsv.sm.CheckMySQL() + tsv.sm.checkMySQL() } // TopoServer returns the topo server. diff --git a/go/vt/vttablet/tabletserver/tabletserver_test.go b/go/vt/vttablet/tabletserver/tabletserver_test.go index 36e983d2ecf..6f4981310e7 100644 --- a/go/vt/vttablet/tabletserver/tabletserver_test.go +++ b/go/vt/vttablet/tabletserver/tabletserver_test.go @@ -1253,6 +1253,34 @@ func TestMessageStream(t *testing.T) { } } +func TestCheckMySQLGauge(t *testing.T) { + _, tsv, db := newTestTxExecutor(t) + defer db.Close() + defer tsv.StopService() + + // Check that initially checkMySQLGauge has 0 value + assert.EqualValues(t, 0, tsv.checkMysqlGaugeFunc.Get()) + tsv.CheckMySQL() + // After the checkMySQL call checkMySQLGauge should have 1 value + assert.EqualValues(t, 1, tsv.checkMysqlGaugeFunc.Get()) + + // Wait for CheckMySQL to finish. + // This wait is required because CheckMySQL waits for 1 second after it finishes execution + // before letting go of the acquired locks. + timeout := time.After(2 * time.Second) + for { + select { + case <-timeout: + t.Fatalf("Timedout waiting for CheckMySQL to finish") + default: + if tsv.checkMysqlGaugeFunc.Get() == 0 { + return + } + time.Sleep(100 * time.Millisecond) + } + } +} + func TestMessageAck(t *testing.T) { _, tsv, db := newTestTxExecutor(t) defer db.Close() From db1564f41997f8013b688164292970696171acfd Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Wed, 19 Oct 2022 12:21:40 +0530 Subject: [PATCH 2/2] feat: fix data race by converting bool to atomic bool Signed-off-by: Manan Gupta --- go/vt/vttablet/tabletserver/state_manager.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/go/vt/vttablet/tabletserver/state_manager.go b/go/vt/vttablet/tabletserver/state_manager.go index 1257e501b3f..d0e67a1040f 100644 --- a/go/vt/vttablet/tabletserver/state_manager.go +++ b/go/vt/vttablet/tabletserver/state_manager.go @@ -122,7 +122,7 @@ type stateManager struct { // checkMySQLThrottler ensures that CheckMysql // doesn't get spammed. checkMySQLThrottler *sync2.Semaphore - checkMySQLRunning bool + checkMySQLRunning sync2.AtomicBool timebombDuration time.Duration unhealthyThreshold sync2.AtomicDuration @@ -310,11 +310,11 @@ func (sm *stateManager) checkMySQL() { return } log.Infof("CheckMySQL started") - sm.checkMySQLRunning = true + sm.checkMySQLRunning.Set(true) go func() { defer func() { time.Sleep(1 * time.Second) - sm.checkMySQLRunning = false + sm.checkMySQLRunning.Set(false) sm.checkMySQLThrottler.Release() log.Infof("CheckMySQL finished") }() @@ -337,7 +337,7 @@ func (sm *stateManager) checkMySQL() { // isCheckMySQLRunning returns 1 if CheckMySQL function is in progress func (sm *stateManager) isCheckMySQLRunning() int64 { - if sm.checkMySQLRunning { + if sm.checkMySQLRunning.Get() { return 1 } return 0