Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 34 additions & 3 deletions go/vt/vttablet/tabletserver/state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,14 @@ type servingState int64

const (
// StateNotConnected is the state where tabletserver is not
// connected to an underlying mysql instance.
// connected to an underlying mysql instance. In this state we close
// query engine since MySQL is probably unavailable
StateNotConnected = servingState(iota)
// StateNotServing is the state where tabletserver is connected
// to an underlying mysql instance, but is not serving queries.
// We do not close the query engine to not close the pool. We keep
// the query engine open but prevent queries from running by blocking them
// in StartRequest.
StateNotServing
// StateServing is where queries are allowed.
StateServing
Expand Down Expand Up @@ -122,6 +126,7 @@ type stateManager struct {
// checkMySQLThrottler ensures that CheckMysql
// doesn't get spammed.
checkMySQLThrottler *sync2.Semaphore
checkMySQLRunning sync2.AtomicBool

timebombDuration time.Duration
unhealthyThreshold sync2.AtomicDuration
Expand Down Expand Up @@ -301,17 +306,21 @@ func (sm *stateManager) recheckState() bool {
return false
}

// CheckMySQL verifies that we can connect to mysql.
// checkMySQL verifies that we can connect to mysql.
// If it fails, then we shutdown the service and initiate
// the retry loop.
func (sm *stateManager) CheckMySQL() {
func (sm *stateManager) checkMySQL() {
if !sm.checkMySQLThrottler.TryAcquire() {
return
}
log.Infof("CheckMySQL started")
sm.checkMySQLRunning.Set(true)
go func() {
defer func() {
time.Sleep(1 * time.Second)
sm.checkMySQLRunning.Set(false)
sm.checkMySQLThrottler.Release()
log.Infof("CheckMySQL finished")
}()

err := sm.qe.IsMySQLReachable()
Expand All @@ -325,11 +334,33 @@ func (sm *stateManager) CheckMySQL() {
}
defer sm.transitioning.Release()

// This is required to prevent new queries from running in StartRequest
// unless they are part of a running transaction.
sm.setWantState(StateNotConnected)
sm.closeAll()

// Now that we reached the NotConnected state, we want to go back to the
// Serving state. The retry will only succeed once MySQL is reachable again
// Until then EnsureConnectionAndDB will error out.
sm.setWantState(StateServing)
sm.retryTransition(fmt.Sprintf("Cannot connect to MySQL, shutting down query service: %v", err))
}()
}

func (sm *stateManager) setWantState(stateWanted servingState) {
sm.mu.Lock()
defer sm.mu.Unlock()
sm.wantState = stateWanted
}

// isCheckMySQLRunning returns 1 if CheckMySQL function is in progress
func (sm *stateManager) isCheckMySQLRunning() int64 {
if sm.checkMySQLRunning.Get() {
return 1
}
return 0
}

// StopService shuts down sm. If the shutdown doesn't complete
// within timeBombDuration, it crashes the process.
func (sm *stateManager) StopService() {
Expand Down
32 changes: 26 additions & 6 deletions go/vt/vttablet/tabletserver/state_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,19 @@ limitations under the License.
package tabletserver

import (
"context"
"errors"
"sync"
"testing"
"time"

"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/mysql/fakesqldb"

"context"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql/fakesqldb"

"vitess.io/vitess/go/sync2"
"vitess.io/vitess/go/vt/log"
querypb "vitess.io/vitess/go/vt/proto/query"
Expand Down Expand Up @@ -457,12 +456,18 @@ func TestStateManagerCheckMySQL(t *testing.T) {
err := sm.SetServingType(topodatapb.TabletType_PRIMARY, testNow, StateServing, "")
require.NoError(t, err)

sm.te = &delayedTxEngine{}
sm.qe.(*testQueryEngine).failMySQL = true
order.Set(0)
sm.CheckMySQL()
sm.checkMySQL()
// We know checkMySQL will take atleast 50 milliseconds since txEngine.Close has a sleep in the test code
time.Sleep(10 * time.Millisecond)
assert.EqualValues(t, 1, sm.isCheckMySQLRunning())
// When we are in CheckMySQL state, we should not be accepting any new requests which aren't transactional
assert.False(t, sm.IsServing())

// Rechecking immediately should be a no-op:
sm.CheckMySQL()
sm.checkMySQL()

// Wait for closeAll to get under way.
for {
Expand Down Expand Up @@ -491,8 +496,23 @@ func TestStateManagerCheckMySQL(t *testing.T) {
time.Sleep(10 * time.Millisecond)
}

assert.True(t, sm.IsServing())
assert.Equal(t, topodatapb.TabletType_PRIMARY, sm.Target().TabletType)
assert.Equal(t, StateServing, sm.State())

// Wait for checkMySQL to finish.
timeout := time.After(2 * time.Second)
for {
select {
case <-timeout:
t.Fatalf("Timedout waiting for checkMySQL to finish")
default:
if sm.isCheckMySQLRunning() == 0 {
return
}
time.Sleep(100 * time.Millisecond)
}
}
}

func TestStateManagerValidations(t *testing.T) {
Expand Down
6 changes: 5 additions & 1 deletion go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ type TabletServer struct {

// alias is used for identifying this tabletserver in healthcheck responses.
alias *topodatapb.TabletAlias

// This field is only stored for testing
checkMysqlGaugeFunc *stats.GaugeFunc
}

var _ queryservice.QueryService = (*TabletServer)(nil)
Expand Down Expand Up @@ -205,6 +208,7 @@ func NewTabletServer(name string, config *tabletenv.TabletConfig, topoServer *to
}

tsv.exporter.NewGaugeFunc("TabletState", "Tablet server state", func() int64 { return int64(tsv.sm.State()) })
tsv.checkMysqlGaugeFunc = tsv.exporter.NewGaugeFunc("CheckMySQLRunning", "Check MySQL operation currently in progress", tsv.sm.isCheckMySQLRunning)
tsv.exporter.Publish("TabletStateName", stats.StringFunc(tsv.sm.IsServingString))

// TabletServerState exports the same information as the above two stats (TabletState / TabletStateName),
Expand Down Expand Up @@ -1498,7 +1502,7 @@ func (tsv *TabletServer) IsServing() bool {
// to no more than once per second.
// The function satisfies tabletenv.Env.
func (tsv *TabletServer) CheckMySQL() {
tsv.sm.CheckMySQL()
tsv.sm.checkMySQL()
}

// TopoServer returns the topo server.
Expand Down
28 changes: 28 additions & 0 deletions go/vt/vttablet/tabletserver/tabletserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1636,6 +1636,34 @@ func TestMessageStream(t *testing.T) {
}
}

func TestCheckMySQLGauge(t *testing.T) {
_, tsv, db := newTestTxExecutor(t)
defer db.Close()
defer tsv.StopService()

// Check that initially checkMySQLGauge has 0 value
assert.EqualValues(t, 0, tsv.checkMysqlGaugeFunc.Get())
tsv.CheckMySQL()
// After the checkMySQL call checkMySQLGauge should have 1 value
assert.EqualValues(t, 1, tsv.checkMysqlGaugeFunc.Get())

// Wait for CheckMySQL to finish.
// This wait is required because CheckMySQL waits for 1 second after it finishes execution
// before letting go of the acquired locks.
timeout := time.After(2 * time.Second)
for {
select {
case <-timeout:
t.Fatalf("Timedout waiting for CheckMySQL to finish")
default:
if tsv.checkMysqlGaugeFunc.Get() == 0 {
return
}
time.Sleep(100 * time.Millisecond)
}
}
}

func TestMessageAck(t *testing.T) {
_, tsv, db := newTestTxExecutor(t)
defer db.Close()
Expand Down