diff --git a/go/vt/vttablet/tabletserver/state_manager.go b/go/vt/vttablet/tabletserver/state_manager.go index 73580c75764..5d18fcca38e 100644 --- a/go/vt/vttablet/tabletserver/state_manager.go +++ b/go/vt/vttablet/tabletserver/state_manager.go @@ -38,10 +38,14 @@ type servingState int64 const ( // StateNotConnected is the state where tabletserver is not - // connected to an underlying mysql instance. + // connected to an underlying mysql instance. In this state we close + // query engine since MySQL is probably unavailable StateNotConnected = servingState(iota) // StateNotServing is the state where tabletserver is connected // to an underlying mysql instance, but is not serving queries. + // We do not close the query engine to not close the pool. We keep + // the query engine open but prevent queries from running by blocking them + // in StartRequest. StateNotServing // StateServing is where queries are allowed. StateServing @@ -122,6 +126,7 @@ type stateManager struct { // checkMySQLThrottler ensures that CheckMysql // doesn't get spammed. checkMySQLThrottler *sync2.Semaphore + checkMySQLRunning sync2.AtomicBool timebombDuration time.Duration unhealthyThreshold sync2.AtomicDuration @@ -301,17 +306,21 @@ func (sm *stateManager) recheckState() bool { return false } -// CheckMySQL verifies that we can connect to mysql. +// checkMySQL verifies that we can connect to mysql. // If it fails, then we shutdown the service and initiate // the retry loop. -func (sm *stateManager) CheckMySQL() { +func (sm *stateManager) checkMySQL() { if !sm.checkMySQLThrottler.TryAcquire() { return } + log.Infof("CheckMySQL started") + sm.checkMySQLRunning.Set(true) go func() { defer func() { time.Sleep(1 * time.Second) + sm.checkMySQLRunning.Set(false) sm.checkMySQLThrottler.Release() + log.Infof("CheckMySQL finished") }() err := sm.qe.IsMySQLReachable() @@ -325,11 +334,33 @@ func (sm *stateManager) CheckMySQL() { } defer sm.transitioning.Release() + // This is required to prevent new queries from running in StartRequest + // unless they are part of a running transaction. + sm.setWantState(StateNotConnected) sm.closeAll() + + // Now that we reached the NotConnected state, we want to go back to the + // Serving state. The retry will only succeed once MySQL is reachable again + // Until then EnsureConnectionAndDB will error out. + sm.setWantState(StateServing) sm.retryTransition(fmt.Sprintf("Cannot connect to MySQL, shutting down query service: %v", err)) }() } +func (sm *stateManager) setWantState(stateWanted servingState) { + sm.mu.Lock() + defer sm.mu.Unlock() + sm.wantState = stateWanted +} + +// isCheckMySQLRunning returns 1 if CheckMySQL function is in progress +func (sm *stateManager) isCheckMySQLRunning() int64 { + if sm.checkMySQLRunning.Get() { + return 1 + } + return 0 +} + // StopService shuts down sm. If the shutdown doesn't complete // within timeBombDuration, it crashes the process. func (sm *stateManager) StopService() { diff --git a/go/vt/vttablet/tabletserver/state_manager_test.go b/go/vt/vttablet/tabletserver/state_manager_test.go index 04817f3bf9b..5e0fb4cbdf2 100644 --- a/go/vt/vttablet/tabletserver/state_manager_test.go +++ b/go/vt/vttablet/tabletserver/state_manager_test.go @@ -17,6 +17,7 @@ limitations under the License. package tabletserver import ( + "context" "errors" "sync" "testing" @@ -24,13 +25,11 @@ import ( "google.golang.org/protobuf/proto" - "vitess.io/vitess/go/mysql/fakesqldb" - - "context" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "vitess.io/vitess/go/mysql/fakesqldb" + "vitess.io/vitess/go/sync2" "vitess.io/vitess/go/vt/log" querypb "vitess.io/vitess/go/vt/proto/query" @@ -457,12 +456,18 @@ func TestStateManagerCheckMySQL(t *testing.T) { err := sm.SetServingType(topodatapb.TabletType_PRIMARY, testNow, StateServing, "") require.NoError(t, err) + sm.te = &delayedTxEngine{} sm.qe.(*testQueryEngine).failMySQL = true order.Set(0) - sm.CheckMySQL() + sm.checkMySQL() + // We know checkMySQL will take atleast 50 milliseconds since txEngine.Close has a sleep in the test code + time.Sleep(10 * time.Millisecond) + assert.EqualValues(t, 1, sm.isCheckMySQLRunning()) + // When we are in CheckMySQL state, we should not be accepting any new requests which aren't transactional + assert.False(t, sm.IsServing()) // Rechecking immediately should be a no-op: - sm.CheckMySQL() + sm.checkMySQL() // Wait for closeAll to get under way. for { @@ -491,8 +496,23 @@ func TestStateManagerCheckMySQL(t *testing.T) { time.Sleep(10 * time.Millisecond) } + assert.True(t, sm.IsServing()) assert.Equal(t, topodatapb.TabletType_PRIMARY, sm.Target().TabletType) assert.Equal(t, StateServing, sm.State()) + + // Wait for checkMySQL to finish. + timeout := time.After(2 * time.Second) + for { + select { + case <-timeout: + t.Fatalf("Timedout waiting for checkMySQL to finish") + default: + if sm.isCheckMySQLRunning() == 0 { + return + } + time.Sleep(100 * time.Millisecond) + } + } } func TestStateManagerValidations(t *testing.T) { diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index 0580cb5e79d..f3690f50a35 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -123,6 +123,9 @@ type TabletServer struct { // alias is used for identifying this tabletserver in healthcheck responses. alias *topodatapb.TabletAlias + + // This field is only stored for testing + checkMysqlGaugeFunc *stats.GaugeFunc } var _ queryservice.QueryService = (*TabletServer)(nil) @@ -205,6 +208,7 @@ func NewTabletServer(name string, config *tabletenv.TabletConfig, topoServer *to } tsv.exporter.NewGaugeFunc("TabletState", "Tablet server state", func() int64 { return int64(tsv.sm.State()) }) + tsv.checkMysqlGaugeFunc = tsv.exporter.NewGaugeFunc("CheckMySQLRunning", "Check MySQL operation currently in progress", tsv.sm.isCheckMySQLRunning) tsv.exporter.Publish("TabletStateName", stats.StringFunc(tsv.sm.IsServingString)) // TabletServerState exports the same information as the above two stats (TabletState / TabletStateName), @@ -1498,7 +1502,7 @@ func (tsv *TabletServer) IsServing() bool { // to no more than once per second. // The function satisfies tabletenv.Env. func (tsv *TabletServer) CheckMySQL() { - tsv.sm.CheckMySQL() + tsv.sm.checkMySQL() } // TopoServer returns the topo server. diff --git a/go/vt/vttablet/tabletserver/tabletserver_test.go b/go/vt/vttablet/tabletserver/tabletserver_test.go index 2a703a14067..95c73f923e1 100644 --- a/go/vt/vttablet/tabletserver/tabletserver_test.go +++ b/go/vt/vttablet/tabletserver/tabletserver_test.go @@ -1636,6 +1636,34 @@ func TestMessageStream(t *testing.T) { } } +func TestCheckMySQLGauge(t *testing.T) { + _, tsv, db := newTestTxExecutor(t) + defer db.Close() + defer tsv.StopService() + + // Check that initially checkMySQLGauge has 0 value + assert.EqualValues(t, 0, tsv.checkMysqlGaugeFunc.Get()) + tsv.CheckMySQL() + // After the checkMySQL call checkMySQLGauge should have 1 value + assert.EqualValues(t, 1, tsv.checkMysqlGaugeFunc.Get()) + + // Wait for CheckMySQL to finish. + // This wait is required because CheckMySQL waits for 1 second after it finishes execution + // before letting go of the acquired locks. + timeout := time.After(2 * time.Second) + for { + select { + case <-timeout: + t.Fatalf("Timedout waiting for CheckMySQL to finish") + default: + if tsv.checkMysqlGaugeFunc.Get() == 0 { + return + } + time.Sleep(100 * time.Millisecond) + } + } +} + func TestMessageAck(t *testing.T) { _, tsv, db := newTestTxExecutor(t) defer db.Close()