Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 40 additions & 4 deletions go/vt/vttablet/tabletserver/health_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"vitess.io/vitess/go/history"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
)
Expand All @@ -46,6 +48,8 @@ type healthStreamer struct {
unhealthyThreshold time.Duration

mu sync.Mutex
ctx context.Context
cancel context.CancelFunc
clients map[chan *querypb.StreamHealthResponse]struct{}
state *querypb.StreamHealthResponse

Expand All @@ -72,17 +76,45 @@ func newHealthStreamer(env tabletenv.Env, alias topodatapb.TabletAlias) *healthS
}

func (hs *healthStreamer) InitDBConfig(target querypb.Target) {
hs.state.Target = &target
// Weird test failures happen if we don't instantiate
// a separate variable.
inner := target
hs.state.Target = &inner
}

func (hs *healthStreamer) Open() {
hs.mu.Lock()
defer hs.mu.Unlock()

if hs.cancel != nil {
return
}
hs.ctx, hs.cancel = context.WithCancel(context.TODO())
}

func (hs *healthStreamer) Close() {
hs.mu.Lock()
defer hs.mu.Unlock()

if hs.cancel != nil {
hs.cancel()
hs.cancel = nil
}
}

func (hs *healthStreamer) Stream(ctx context.Context, callback func(*querypb.StreamHealthResponse) error) error {
ch := hs.register()
ch, hsCtx := hs.register()
if hsCtx == nil {
return vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "tabletserver is shutdown")
}
defer hs.unregister(ch)

for {
select {
case <-ctx.Done():
return nil
case <-hsCtx.Done():
return vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "tabletserver is shutdown")
case shr := <-ch:
if err := callback(shr); err != nil {
if err == io.EOF {
Expand All @@ -94,16 +126,20 @@ func (hs *healthStreamer) Stream(ctx context.Context, callback func(*querypb.Str
}
}

func (hs *healthStreamer) register() chan *querypb.StreamHealthResponse {
func (hs *healthStreamer) register() (chan *querypb.StreamHealthResponse, context.Context) {
hs.mu.Lock()
defer hs.mu.Unlock()

if hs.cancel == nil {
return nil, nil
}

ch := make(chan *querypb.StreamHealthResponse, 1)
hs.clients[ch] = struct{}{}

// Send the current state immediately.
ch <- proto.Clone(hs.state).(*querypb.StreamHealthResponse)
return ch
return ch, hs.ctx
}

func (hs *healthStreamer) unregister(ch chan *querypb.StreamHealthResponse) {
Expand Down
17 changes: 17 additions & 0 deletions go/vt/vttablet/tabletserver/health_streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,21 @@ import (
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
)

func TestHealthStreamerClosed(t *testing.T) {
config := tabletenv.NewDefaultConfig()
env := tabletenv.NewEnv(config, "ReplTrackerTest")
alias := topodatapb.TabletAlias{
Cell: "cell",
Uid: 1,
}
blpFunc = testBlpFunc
hs := newHealthStreamer(env, alias)
err := hs.Stream(context.Background(), func(shr *querypb.StreamHealthResponse) error {
return nil
})
assert.Contains(t, err.Error(), "tabletserver is shutdown")
}

func TestHealthStreamerBroadcast(t *testing.T) {
config := tabletenv.NewDefaultConfig()
env := tabletenv.NewEnv(config, "ReplTrackerTest")
Expand All @@ -37,6 +52,8 @@ func TestHealthStreamerBroadcast(t *testing.T) {
}
blpFunc = testBlpFunc
hs := newHealthStreamer(env, alias)
hs.Open()
defer hs.Close()
target := querypb.Target{}
hs.InitDBConfig(target)

Expand Down
13 changes: 5 additions & 8 deletions go/vt/vttablet/tabletserver/state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ type stateManager struct {
// Open must be done in forward order.
// Close must be done in reverse order.
// All Close functions must be called before Open.
hs *healthStreamer
se schemaEngine
rt replTracker
vstreamer subComponent
Expand All @@ -105,10 +106,6 @@ type stateManager struct {
te txEngine
messager subComponent

// notify will be invoked by stateManager on every state change.
// The implementation is provided by healthStreamer.ChangeState.
notify func(topodatapb.TabletType, time.Time, time.Duration, error, bool)

// hcticks starts on initialiazation and runs forever.
hcticks *timer.Timer

Expand Down Expand Up @@ -181,7 +178,7 @@ func (sm *stateManager) Init(env tabletenv.Env, target querypb.Target) {
func (sm *stateManager) SetServingType(tabletType topodatapb.TabletType, terTimestamp time.Time, state servingState, reason string) error {
defer sm.ExitLameduck()

// Start is idempotent.
sm.hs.Open()
sm.hcticks.Start(sm.Broadcast)

if tabletType == topodatapb.TabletType_RESTORE || tabletType == topodatapb.TabletType_BACKUP {
Expand Down Expand Up @@ -313,9 +310,9 @@ func (sm *stateManager) StopService() {
defer close(sm.setTimeBomb())

log.Info("Stopping TabletServer")
// Stop replica tracking because StopService is used by all tests.
sm.hcticks.Stop()
sm.SetServingType(sm.Target().TabletType, time.Time{}, StateNotConnected, "service stopped")
sm.hcticks.Stop()
sm.hs.Close()
}

// StartRequest validates the current state and target and registers
Expand Down Expand Up @@ -569,7 +566,7 @@ func (sm *stateManager) Broadcast() {
defer sm.mu.Unlock()

lag, err := sm.refreshReplHealthLocked()
sm.notify(sm.target.TabletType, sm.terTimestamp, lag, err, sm.isServingLocked())
sm.hs.ChangeState(sm.target.TabletType, sm.terTimestamp, lag, err, sm.isServingLocked())
}

func (sm *stateManager) refreshReplHealthLocked() (time.Duration, error) {
Expand Down
83 changes: 54 additions & 29 deletions go/vt/vttablet/tabletserver/state_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/net/context"
"vitess.io/vitess/go/sync2"
"vitess.io/vitess/go/vt/log"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
Expand Down Expand Up @@ -60,6 +62,7 @@ func TestStateManagerStateByName(t *testing.T) {

func TestStateManagerServeMaster(t *testing.T) {
sm := newTestStateManager(t)
defer sm.StopService()
sm.EnterLameduck()
err := sm.SetServingType(topodatapb.TabletType_MASTER, testNow, StateServing, "")
require.NoError(t, err)
Expand Down Expand Up @@ -88,6 +91,7 @@ func TestStateManagerServeMaster(t *testing.T) {

func TestStateManagerServeNonMaster(t *testing.T) {
sm := newTestStateManager(t)
defer sm.StopService()
err := sm.SetServingType(topodatapb.TabletType_REPLICA, testNow, StateServing, "")
require.NoError(t, err)

Expand All @@ -109,6 +113,7 @@ func TestStateManagerServeNonMaster(t *testing.T) {

func TestStateManagerUnserveMaster(t *testing.T) {
sm := newTestStateManager(t)
defer sm.StopService()
err := sm.SetServingType(topodatapb.TabletType_MASTER, testNow, StateNotServing, "")
require.NoError(t, err)

Expand All @@ -132,6 +137,7 @@ func TestStateManagerUnserveMaster(t *testing.T) {

func TestStateManagerUnserveNonmaster(t *testing.T) {
sm := newTestStateManager(t)
defer sm.StopService()
err := sm.SetServingType(topodatapb.TabletType_RDONLY, testNow, StateNotServing, "")
require.NoError(t, err)

Expand All @@ -156,6 +162,7 @@ func TestStateManagerUnserveNonmaster(t *testing.T) {

func TestStateManagerClose(t *testing.T) {
sm := newTestStateManager(t)
defer sm.StopService()
err := sm.SetServingType(topodatapb.TabletType_RDONLY, testNow, StateNotConnected, "")
require.NoError(t, err)

Expand All @@ -177,6 +184,7 @@ func TestStateManagerClose(t *testing.T) {

func TestStateManagerStopService(t *testing.T) {
sm := newTestStateManager(t)
defer sm.StopService()
err := sm.SetServingType(topodatapb.TabletType_REPLICA, testNow, StateServing, "")
require.NoError(t, err)

Expand All @@ -190,6 +198,7 @@ func TestStateManagerStopService(t *testing.T) {

func TestStateManagerGracePeriod(t *testing.T) {
sm := newTestStateManager(t)
defer sm.StopService()
sm.transitionGracePeriod = 10 * time.Millisecond

alsoAllow := func() topodatapb.TabletType {
Expand Down Expand Up @@ -240,6 +249,8 @@ func (te *testWatcher) Close() {
}

func TestStateManagerSetServingTypeRace(t *testing.T) {
// We don't call StopService because that in turn
// will call Close again on testWatcher.
sm := newTestStateManager(t)
te := &testWatcher{
t: t,
Expand All @@ -258,7 +269,9 @@ func TestStateManagerSetServingTypeRace(t *testing.T) {
}

func TestStateManagerSetServingTypeNoChange(t *testing.T) {
log.Infof("starting")
sm := newTestStateManager(t)
defer sm.StopService()
err := sm.SetServingType(topodatapb.TabletType_REPLICA, testNow, StateServing, "")
require.NoError(t, err)

Expand Down Expand Up @@ -286,6 +299,7 @@ func TestStateManagerTransitionFailRetry(t *testing.T) {
transitionRetryInterval = 10 * time.Millisecond

sm := newTestStateManager(t)
defer sm.StopService()
sm.se.(*testSchemaEngine).failMySQL = true

err := sm.SetServingType(topodatapb.TabletType_MASTER, testNow, StateServing, "")
Expand Down Expand Up @@ -317,6 +331,7 @@ func TestStateManagerTransitionFailRetry(t *testing.T) {

func TestStateManagerNotConnectedType(t *testing.T) {
sm := newTestStateManager(t)
defer sm.StopService()
sm.EnterLameduck()
err := sm.SetServingType(topodatapb.TabletType_RESTORE, testNow, StateNotServing, "")
require.NoError(t, err)
Expand All @@ -336,6 +351,7 @@ func TestStateManagerCheckMySQL(t *testing.T) {
transitionRetryInterval = 10 * time.Millisecond

sm := newTestStateManager(t)
defer sm.StopService()

err := sm.SetServingType(topodatapb.TabletType_MASTER, testNow, StateServing, "")
require.NoError(t, err)
Expand Down Expand Up @@ -442,6 +458,7 @@ func TestStateManagerValidations(t *testing.T) {

func TestStateManagerWaitForRequests(t *testing.T) {
sm := newTestStateManager(t)
defer sm.StopService()
target := &querypb.Target{TabletType: topodatapb.TabletType_MASTER}
sm.target = *target
sm.timebombDuration = 10 * time.Second
Expand Down Expand Up @@ -481,40 +498,46 @@ func TestStateManagerWaitForRequests(t *testing.T) {

func TestStateManagerNotify(t *testing.T) {
sm := newTestStateManager(t)
var (
gotType topodatapb.TabletType
gotts time.Time
gotlag time.Duration
goterr error
gotServing bool
)

ch := make(chan struct{})
sm.notify = func(tabletType topodatapb.TabletType, terTimestamp time.Time, lag time.Duration, err error, serving bool) {
gotType = tabletType
gotts = terTimestamp
gotlag = lag
goterr = err
gotServing = serving
ch <- struct{}{}
}
defer sm.StopService()

blpFunc = testBlpFunc

err := sm.SetServingType(topodatapb.TabletType_REPLICA, testNow, StateServing, "")
require.NoError(t, err)

assert.Equal(t, topodatapb.TabletType_REPLICA, sm.target.TabletType)
assert.Equal(t, StateServing, sm.state)

<-ch
ch := make(chan *querypb.StreamHealthResponse, 5)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
err := sm.hs.Stream(context.Background(), func(shr *querypb.StreamHealthResponse) error {
ch <- shr
return nil
})
assert.Contains(t, err.Error(), "tabletserver is shutdown")
}()
defer wg.Wait()

sm.Broadcast()

gotshr := <-ch
// Remove things we don't care about:
gotshr.RealtimeStats = nil
wantshr := &querypb.StreamHealthResponse{
Target: &querypb.Target{
TabletType: topodatapb.TabletType_REPLICA,
},
Serving: true,
TabletAlias: &topodatapb.TabletAlias{},
}
sm.hcticks.Stop()
assert.Equal(t, topodatapb.TabletType_REPLICA, gotType)
assert.Equal(t, testNow, gotts)
assert.Equal(t, 1*time.Second, gotlag)
assert.Equal(t, nil, goterr)
assert.True(t, gotServing)
assert.Equal(t, wantshr, gotshr)
sm.StopService()
}

func TestRefreshReplHealthLocked(t *testing.T) {
sm := newTestStateManager(t)
defer sm.StopService()
rt := sm.rt.(*testReplTracker)

sm.target.TabletType = topodatapb.TabletType_MASTER
Expand Down Expand Up @@ -555,7 +578,10 @@ func verifySubcomponent(t *testing.T, order int64, component interface{}, state

func newTestStateManager(t *testing.T) *stateManager {
order.Set(0)
config := tabletenv.NewDefaultConfig()
env := tabletenv.NewEnv(config, "StateManagerTest")
sm := &stateManager{
hs: newHealthStreamer(env, topodatapb.TabletAlias{}),
se: &testSchemaEngine{},
rt: &testReplTracker{lag: 1 * time.Second},
vstreamer: &testSubcomponent{},
Expand All @@ -565,11 +591,10 @@ func newTestStateManager(t *testing.T) *stateManager {
txThrottler: &testTxThrottler{},
te: &testTxEngine{},
messager: &testSubcomponent{},
notify: func(topodatapb.TabletType, time.Time, time.Duration, error, bool) {},
}
config := tabletenv.NewDefaultConfig()
env := tabletenv.NewEnv(config, "StateManagerTest")
sm.Init(env, querypb.Target{})
sm.hs.InitDBConfig(querypb.Target{})
log.Infof("returning sm: %p", sm)
return sm
}

Expand Down
Loading