diff --git a/go/vt/vttablet/tabletserver/state_manager.go b/go/vt/vttablet/tabletserver/state_manager.go index 36fb4aea22c..60d3aaa3792 100644 --- a/go/vt/vttablet/tabletserver/state_manager.go +++ b/go/vt/vttablet/tabletserver/state_manager.go @@ -126,6 +126,7 @@ type stateManager struct { // checkMySQLThrottler ensures that CheckMysql // doesn't get spammed. checkMySQLThrottler *sync2.Semaphore + checkMySQLRunning sync2.AtomicBool timebombDuration time.Duration unhealthyThreshold sync2.AtomicDuration @@ -305,17 +306,21 @@ func (sm *stateManager) recheckState() bool { return false } -// CheckMySQL verifies that we can connect to mysql. +// checkMySQL verifies that we can connect to mysql. // If it fails, then we shutdown the service and initiate // the retry loop. -func (sm *stateManager) CheckMySQL() { +func (sm *stateManager) checkMySQL() { if !sm.checkMySQLThrottler.TryAcquire() { return } + log.Infof("CheckMySQL started") + sm.checkMySQLRunning.Set(true) go func() { defer func() { time.Sleep(1 * time.Second) + sm.checkMySQLRunning.Set(false) sm.checkMySQLThrottler.Release() + log.Infof("CheckMySQL finished") }() err := sm.qe.IsMySQLReachable() @@ -348,6 +353,14 @@ func (sm *stateManager) setWantState(stateWanted servingState) { sm.wantState = stateWanted } +// isCheckMySQLRunning returns 1 if CheckMySQL function is in progress +func (sm *stateManager) isCheckMySQLRunning() int64 { + if sm.checkMySQLRunning.Get() { + return 1 + } + return 0 +} + // StopService shuts down sm. If the shutdown doesn't complete // within timeBombDuration, it crashes the process. func (sm *stateManager) StopService() { diff --git a/go/vt/vttablet/tabletserver/state_manager_test.go b/go/vt/vttablet/tabletserver/state_manager_test.go index ebece7f00c6..6947b51a26d 100644 --- a/go/vt/vttablet/tabletserver/state_manager_test.go +++ b/go/vt/vttablet/tabletserver/state_manager_test.go @@ -17,6 +17,7 @@ limitations under the License. package tabletserver import ( + "context" "errors" "sync" "testing" @@ -24,13 +25,11 @@ import ( "google.golang.org/protobuf/proto" - "vitess.io/vitess/go/mysql/fakesqldb" - - "context" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "vitess.io/vitess/go/mysql/fakesqldb" + "vitess.io/vitess/go/sync2" "vitess.io/vitess/go/vt/log" querypb "vitess.io/vitess/go/vt/proto/query" @@ -460,16 +459,11 @@ func TestStateManagerCheckMySQL(t *testing.T) { sm.te = &delayedTxEngine{} sm.qe.(*testQueryEngine).failMySQL = true order.Set(0) - sm.CheckMySQL() - // We know checkMySQL will take atleast 50 milliseconds since txEngine.Close has a sleep in the test code - time.Sleep(10 * time.Millisecond) - // this asserts that checkMySQL is running - assert.EqualValues(t, 0, sm.checkMySQLThrottler.Size()) - // When we are in CheckMySQL state, we should not be accepting any new requests which aren't transactional - assert.False(t, sm.IsServing()) + sm.checkMySQL() + assert.EqualValues(t, 1, sm.isCheckMySQLRunning()) // Rechecking immediately should be a no-op: - sm.CheckMySQL() + sm.checkMySQL() // Wait for closeAll to get under way. for { @@ -501,6 +495,20 @@ func TestStateManagerCheckMySQL(t *testing.T) { assert.True(t, sm.IsServing()) assert.Equal(t, topodatapb.TabletType_PRIMARY, sm.Target().TabletType) assert.Equal(t, StateServing, sm.State()) + + // Wait for checkMySQL to finish. + timeout := time.After(2 * time.Second) + for { + select { + case <-timeout: + t.Fatalf("Timedout waiting for checkMySQL to finish") + default: + if sm.isCheckMySQLRunning() == 0 { + return + } + time.Sleep(100 * time.Millisecond) + } + } } func TestStateManagerValidations(t *testing.T) { diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index c719c2b565d..555e7242d1c 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -124,6 +124,9 @@ type TabletServer struct { // alias is used for identifying this tabletserver in healthcheck responses. alias *topodatapb.TabletAlias + + // This field is only stored for testing + checkMysqlGaugeFunc *stats.GaugeFunc } var _ queryservice.QueryService = (*TabletServer)(nil) @@ -206,6 +209,7 @@ func NewTabletServer(name string, config *tabletenv.TabletConfig, topoServer *to } tsv.exporter.NewGaugeFunc("TabletState", "Tablet server state", func() int64 { return int64(tsv.sm.State()) }) + tsv.checkMysqlGaugeFunc = tsv.exporter.NewGaugeFunc("CheckMySQLRunning", "Check MySQL operation currently in progress", tsv.sm.isCheckMySQLRunning) tsv.exporter.Publish("TabletStateName", stats.StringFunc(tsv.sm.IsServingString)) // TabletServerState exports the same information as the above two stats (TabletState / TabletStateName), @@ -1552,7 +1556,7 @@ func (tsv *TabletServer) IsServing() bool { // to no more than once per second. // The function satisfies tabletenv.Env. func (tsv *TabletServer) CheckMySQL() { - tsv.sm.CheckMySQL() + tsv.sm.checkMySQL() } // TopoServer returns the topo server. diff --git a/go/vt/vttablet/tabletserver/tabletserver_test.go b/go/vt/vttablet/tabletserver/tabletserver_test.go index 041e1f10559..b4add92168a 100644 --- a/go/vt/vttablet/tabletserver/tabletserver_test.go +++ b/go/vt/vttablet/tabletserver/tabletserver_test.go @@ -1308,6 +1308,34 @@ func TestMessageStream(t *testing.T) { } } +func TestCheckMySQLGauge(t *testing.T) { + _, tsv, db := newTestTxExecutor(t) + defer db.Close() + defer tsv.StopService() + + // Check that initially checkMySQLGauge has 0 value + assert.EqualValues(t, 0, tsv.checkMysqlGaugeFunc.Get()) + tsv.CheckMySQL() + // After the checkMySQL call checkMySQLGauge should have 1 value + assert.EqualValues(t, 1, tsv.checkMysqlGaugeFunc.Get()) + + // Wait for CheckMySQL to finish. + // This wait is required because CheckMySQL waits for 1 second after it finishes execution + // before letting go of the acquired locks. + timeout := time.After(2 * time.Second) + for { + select { + case <-timeout: + t.Fatalf("Timedout waiting for CheckMySQL to finish") + default: + if tsv.checkMysqlGaugeFunc.Get() == 0 { + return + } + time.Sleep(100 * time.Millisecond) + } + } +} + func TestMessageAck(t *testing.T) { _, tsv, db := newTestTxExecutor(t) defer db.Close()