From 13fba350d23a0ef0189d36611cb0009f12de0b42 Mon Sep 17 00:00:00 2001 From: Manan Gupta <35839558+GuptaManan100@users.noreply.github.com> Date: Thu, 20 Oct 2022 15:19:00 +0530 Subject: [PATCH 1/2] Add Gauge For CheckMySQL Running (#11524) * feat: add gauge for checkMySQL running Signed-off-by: Manan Gupta * feat: fix data race by converting bool to atomic bool Signed-off-by: Manan Gupta Signed-off-by: Manan Gupta --- go/vt/vttablet/tabletserver/state_manager.go | 17 +++++++++-- .../tabletserver/state_manager_test.go | 26 +++++++++++++---- go/vt/vttablet/tabletserver/tabletserver.go | 6 +++- .../tabletserver/tabletserver_test.go | 28 +++++++++++++++++++ 4 files changed, 68 insertions(+), 9 deletions(-) diff --git a/go/vt/vttablet/tabletserver/state_manager.go b/go/vt/vttablet/tabletserver/state_manager.go index 73580c75764..4d1ef48f6e1 100644 --- a/go/vt/vttablet/tabletserver/state_manager.go +++ b/go/vt/vttablet/tabletserver/state_manager.go @@ -122,6 +122,7 @@ type stateManager struct { // checkMySQLThrottler ensures that CheckMysql // doesn't get spammed. checkMySQLThrottler *sync2.Semaphore + checkMySQLRunning sync2.AtomicBool timebombDuration time.Duration unhealthyThreshold sync2.AtomicDuration @@ -301,17 +302,21 @@ func (sm *stateManager) recheckState() bool { return false } -// CheckMySQL verifies that we can connect to mysql. +// checkMySQL verifies that we can connect to mysql. // If it fails, then we shutdown the service and initiate // the retry loop. -func (sm *stateManager) CheckMySQL() { +func (sm *stateManager) checkMySQL() { if !sm.checkMySQLThrottler.TryAcquire() { return } + log.Infof("CheckMySQL started") + sm.checkMySQLRunning.Set(true) go func() { defer func() { time.Sleep(1 * time.Second) + sm.checkMySQLRunning.Set(false) sm.checkMySQLThrottler.Release() + log.Infof("CheckMySQL finished") }() err := sm.qe.IsMySQLReachable() @@ -330,6 +335,14 @@ func (sm *stateManager) CheckMySQL() { }() } +// isCheckMySQLRunning returns 1 if CheckMySQL function is in progress +func (sm *stateManager) isCheckMySQLRunning() int64 { + if sm.checkMySQLRunning.Get() { + return 1 + } + return 0 +} + // StopService shuts down sm. If the shutdown doesn't complete // within timeBombDuration, it crashes the process. func (sm *stateManager) StopService() { diff --git a/go/vt/vttablet/tabletserver/state_manager_test.go b/go/vt/vttablet/tabletserver/state_manager_test.go index 04817f3bf9b..8973ac942aa 100644 --- a/go/vt/vttablet/tabletserver/state_manager_test.go +++ b/go/vt/vttablet/tabletserver/state_manager_test.go @@ -17,6 +17,7 @@ limitations under the License. package tabletserver import ( + "context" "errors" "sync" "testing" @@ -24,13 +25,11 @@ import ( "google.golang.org/protobuf/proto" - "vitess.io/vitess/go/mysql/fakesqldb" - - "context" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "vitess.io/vitess/go/mysql/fakesqldb" + "vitess.io/vitess/go/sync2" "vitess.io/vitess/go/vt/log" querypb "vitess.io/vitess/go/vt/proto/query" @@ -459,10 +458,11 @@ func TestStateManagerCheckMySQL(t *testing.T) { sm.qe.(*testQueryEngine).failMySQL = true order.Set(0) - sm.CheckMySQL() + sm.checkMySQL() + assert.EqualValues(t, 1, sm.isCheckMySQLRunning()) // Rechecking immediately should be a no-op: - sm.CheckMySQL() + sm.checkMySQL() // Wait for closeAll to get under way. for { @@ -493,6 +493,20 @@ func TestStateManagerCheckMySQL(t *testing.T) { assert.Equal(t, topodatapb.TabletType_PRIMARY, sm.Target().TabletType) assert.Equal(t, StateServing, sm.State()) + + // Wait for checkMySQL to finish. + timeout := time.After(2 * time.Second) + for { + select { + case <-timeout: + t.Fatalf("Timedout waiting for checkMySQL to finish") + default: + if sm.isCheckMySQLRunning() == 0 { + return + } + time.Sleep(100 * time.Millisecond) + } + } } func TestStateManagerValidations(t *testing.T) { diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index 0580cb5e79d..f3690f50a35 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -123,6 +123,9 @@ type TabletServer struct { // alias is used for identifying this tabletserver in healthcheck responses. alias *topodatapb.TabletAlias + + // This field is only stored for testing + checkMysqlGaugeFunc *stats.GaugeFunc } var _ queryservice.QueryService = (*TabletServer)(nil) @@ -205,6 +208,7 @@ func NewTabletServer(name string, config *tabletenv.TabletConfig, topoServer *to } tsv.exporter.NewGaugeFunc("TabletState", "Tablet server state", func() int64 { return int64(tsv.sm.State()) }) + tsv.checkMysqlGaugeFunc = tsv.exporter.NewGaugeFunc("CheckMySQLRunning", "Check MySQL operation currently in progress", tsv.sm.isCheckMySQLRunning) tsv.exporter.Publish("TabletStateName", stats.StringFunc(tsv.sm.IsServingString)) // TabletServerState exports the same information as the above two stats (TabletState / TabletStateName), @@ -1498,7 +1502,7 @@ func (tsv *TabletServer) IsServing() bool { // to no more than once per second. // The function satisfies tabletenv.Env. func (tsv *TabletServer) CheckMySQL() { - tsv.sm.CheckMySQL() + tsv.sm.checkMySQL() } // TopoServer returns the topo server. diff --git a/go/vt/vttablet/tabletserver/tabletserver_test.go b/go/vt/vttablet/tabletserver/tabletserver_test.go index 2a703a14067..95c73f923e1 100644 --- a/go/vt/vttablet/tabletserver/tabletserver_test.go +++ b/go/vt/vttablet/tabletserver/tabletserver_test.go @@ -1636,6 +1636,34 @@ func TestMessageStream(t *testing.T) { } } +func TestCheckMySQLGauge(t *testing.T) { + _, tsv, db := newTestTxExecutor(t) + defer db.Close() + defer tsv.StopService() + + // Check that initially checkMySQLGauge has 0 value + assert.EqualValues(t, 0, tsv.checkMysqlGaugeFunc.Get()) + tsv.CheckMySQL() + // After the checkMySQL call checkMySQLGauge should have 1 value + assert.EqualValues(t, 1, tsv.checkMysqlGaugeFunc.Get()) + + // Wait for CheckMySQL to finish. + // This wait is required because CheckMySQL waits for 1 second after it finishes execution + // before letting go of the acquired locks. + timeout := time.After(2 * time.Second) + for { + select { + case <-timeout: + t.Fatalf("Timedout waiting for CheckMySQL to finish") + default: + if tsv.checkMysqlGaugeFunc.Get() == 0 { + return + } + time.Sleep(100 * time.Millisecond) + } + } +} + func TestMessageAck(t *testing.T) { _, tsv, db := newTestTxExecutor(t) defer db.Close() From 980dd9533bbeb68e898bcf8e447d8e83733fdd9c Mon Sep 17 00:00:00 2001 From: Manan Gupta <35839558+GuptaManan100@users.noreply.github.com> Date: Wed, 7 Dec 2022 17:02:40 +0530 Subject: [PATCH 2/2] feat: fix checkMySQL and add tests and documentation (#11895) Signed-off-by: Manan Gupta Signed-off-by: Manan Gupta --- go/vt/vttablet/tabletserver/state_manager.go | 20 ++++++++++++++++++- .../tabletserver/state_manager_test.go | 6 ++++++ 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/go/vt/vttablet/tabletserver/state_manager.go b/go/vt/vttablet/tabletserver/state_manager.go index 4d1ef48f6e1..5d18fcca38e 100644 --- a/go/vt/vttablet/tabletserver/state_manager.go +++ b/go/vt/vttablet/tabletserver/state_manager.go @@ -38,10 +38,14 @@ type servingState int64 const ( // StateNotConnected is the state where tabletserver is not - // connected to an underlying mysql instance. + // connected to an underlying mysql instance. In this state we close + // query engine since MySQL is probably unavailable StateNotConnected = servingState(iota) // StateNotServing is the state where tabletserver is connected // to an underlying mysql instance, but is not serving queries. + // We do not close the query engine to not close the pool. We keep + // the query engine open but prevent queries from running by blocking them + // in StartRequest. StateNotServing // StateServing is where queries are allowed. StateServing @@ -330,11 +334,25 @@ func (sm *stateManager) checkMySQL() { } defer sm.transitioning.Release() + // This is required to prevent new queries from running in StartRequest + // unless they are part of a running transaction. + sm.setWantState(StateNotConnected) sm.closeAll() + + // Now that we reached the NotConnected state, we want to go back to the + // Serving state. The retry will only succeed once MySQL is reachable again + // Until then EnsureConnectionAndDB will error out. + sm.setWantState(StateServing) sm.retryTransition(fmt.Sprintf("Cannot connect to MySQL, shutting down query service: %v", err)) }() } +func (sm *stateManager) setWantState(stateWanted servingState) { + sm.mu.Lock() + defer sm.mu.Unlock() + sm.wantState = stateWanted +} + // isCheckMySQLRunning returns 1 if CheckMySQL function is in progress func (sm *stateManager) isCheckMySQLRunning() int64 { if sm.checkMySQLRunning.Get() { diff --git a/go/vt/vttablet/tabletserver/state_manager_test.go b/go/vt/vttablet/tabletserver/state_manager_test.go index 8973ac942aa..5e0fb4cbdf2 100644 --- a/go/vt/vttablet/tabletserver/state_manager_test.go +++ b/go/vt/vttablet/tabletserver/state_manager_test.go @@ -456,10 +456,15 @@ func TestStateManagerCheckMySQL(t *testing.T) { err := sm.SetServingType(topodatapb.TabletType_PRIMARY, testNow, StateServing, "") require.NoError(t, err) + sm.te = &delayedTxEngine{} sm.qe.(*testQueryEngine).failMySQL = true order.Set(0) sm.checkMySQL() + // We know checkMySQL will take atleast 50 milliseconds since txEngine.Close has a sleep in the test code + time.Sleep(10 * time.Millisecond) assert.EqualValues(t, 1, sm.isCheckMySQLRunning()) + // When we are in CheckMySQL state, we should not be accepting any new requests which aren't transactional + assert.False(t, sm.IsServing()) // Rechecking immediately should be a no-op: sm.checkMySQL() @@ -491,6 +496,7 @@ func TestStateManagerCheckMySQL(t *testing.T) { time.Sleep(10 * time.Millisecond) } + assert.True(t, sm.IsServing()) assert.Equal(t, topodatapb.TabletType_PRIMARY, sm.Target().TabletType) assert.Equal(t, StateServing, sm.State())