Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions go/vt/vtctld/tablet_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,18 @@ import (
"testing"
"time"

"golang.org/x/net/context"

"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
"vitess.io/vitess/go/vt/logutil"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/topo/memorytopo"
"vitess.io/vitess/go/vt/vttablet/grpcqueryservice"
"vitess.io/vitess/go/vt/vttablet/queryservice"
"vitess.io/vitess/go/vt/vttablet/queryservice/fakes"
"vitess.io/vitess/go/vt/vttablet/tmclient"
"vitess.io/vitess/go/vt/wrangler"
"vitess.io/vitess/go/vt/wrangler/testlib"

querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

// streamHealthTabletServer is a local QueryService implementation to support the tests
Expand Down Expand Up @@ -91,7 +89,7 @@ func (s *streamHealthTabletServer) streamHealthUnregister(id int) error {
}

// BroadcastHealth will broadcast the current health to all listeners
func (s *streamHealthTabletServer) BroadcastHealth(terTimestamp int64, stats *querypb.RealtimeStats) {
func (s *streamHealthTabletServer) BroadcastHealth(terTimestamp int64, stats *querypb.RealtimeStats, maxCache time.Duration) {
shr := &querypb.StreamHealthResponse{
TabletExternallyReparentedTimestamp: terTimestamp,
RealtimeStats: stats,
Expand Down Expand Up @@ -137,7 +135,7 @@ func TestTabletData(t *testing.T) {
case <-stop:
return
default:
shsq.BroadcastHealth(42, stats)
shsq.BroadcastHealth(42, stats, time.Minute)
}
}
}()
Expand Down
25 changes: 20 additions & 5 deletions go/vt/vttablet/endtoend/misc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,15 @@ import (

"github.com/golang/protobuf/proto"
"golang.org/x/net/context"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/callerid"
"vitess.io/vitess/go/vt/log"
querypb "vitess.io/vitess/go/vt/proto/query"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vttablet/endtoend/framework"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"

querypb "vitess.io/vitess/go/vt/proto/query"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

func TestSimpleRead(t *testing.T) {
Expand Down Expand Up @@ -453,7 +451,7 @@ func TestHealth(t *testing.T) {

func TestStreamHealth(t *testing.T) {
var health *querypb.StreamHealthResponse
framework.Server.BroadcastHealth(0, nil)
framework.Server.BroadcastHealth(0, nil, time.Minute)
if err := framework.Server.StreamHealth(context.Background(), func(shr *querypb.StreamHealthResponse) error {
health = shr
return io.EOF
Expand All @@ -465,6 +463,23 @@ func TestStreamHealth(t *testing.T) {
}
}

func TestStreamHealth_Expired(t *testing.T) {
var health *querypb.StreamHealthResponse
framework.Server.BroadcastHealth(0, nil, time.Millisecond)
time.Sleep(5 * time.Millisecond)
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond * 100)
defer cancel()
if err := framework.Server.StreamHealth(ctx, func(shr *querypb.StreamHealthResponse) error {
health = shr
return io.EOF
}); err != nil {
t.Fatal(err)
}
if health != nil {
t.Errorf("Health: %v, want %v", health, nil)
}
}

func TestQueryStats(t *testing.T) {
client := framework.NewClient()
vstart := framework.DebugVars()
Expand Down
17 changes: 10 additions & 7 deletions go/vt/vttablet/tabletmanager/state_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,21 @@ import (
"strings"
"time"

"vitess.io/vitess/go/vt/vterrors"

"golang.org/x/net/context"

"vitess.io/vitess/go/event"
"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/mysqlctl"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/events"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication"
"vitess.io/vitess/go/vt/vttablet/tabletserver/rules"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"

querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

var (
Expand Down Expand Up @@ -105,6 +102,7 @@ func (agent *ActionAgent) broadcastHealth() {
replicationDelay := agent._replicationDelay
healthError := agent._healthy
terTime := agent._tabletExternallyReparentedTime
healthyTime := agent._healthyTime
agent.mutex.Unlock()

// send it to our observers
Expand All @@ -116,12 +114,17 @@ func (agent *ActionAgent) broadcastHealth() {
stats.Qps = tabletenv.QPSRates.TotalRate()
if healthError != nil {
stats.HealthError = healthError.Error()
} else {
timeSinceLastCheck := time.Since(healthyTime)
if timeSinceLastCheck > *healthCheckInterval*3 {
stats.HealthError = fmt.Sprintf("last health check is too old: %s > %s", timeSinceLastCheck, *healthCheckInterval*3)
}
}
var ts int64
if !terTime.IsZero() {
ts = terTime.Unix()
}
go agent.QueryServiceControl.BroadcastHealth(ts, stats)
go agent.QueryServiceControl.BroadcastHealth(ts, stats, *healthCheckInterval*3)
}

// refreshTablet needs to be run after an action may have changed the current
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type Controller interface {
SchemaEngine() *schema.Engine

// BroadcastHealth sends the current health to all listeners
BroadcastHealth(terTimestamp int64, stats *querypb.RealtimeStats)
BroadcastHealth(terTimestamp int64, stats *querypb.RealtimeStats, maxCache time.Duration)

// HeartbeatLag returns the current lag as calculated by the heartbeat
// package, if heartbeat is enabled. Otherwise returns 0.
Expand Down
31 changes: 16 additions & 15 deletions go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"time"

"golang.org/x/net/context"

"vitess.io/vitess/go/acl"
"vitess.io/vitess/go/history"
"vitess.io/vitess/go/mysql"
Expand All @@ -44,6 +43,10 @@ import (
"vitess.io/vitess/go/vt/dbconnpool"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/srvtopo"
"vitess.io/vitess/go/vt/tableacl"
Expand All @@ -61,11 +64,6 @@ import (
"vitess.io/vitess/go/vt/vttablet/tabletserver/txserializer"
"vitess.io/vitess/go/vt/vttablet/tabletserver/txthrottler"
"vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

const (
Expand Down Expand Up @@ -184,10 +182,11 @@ type TabletServer struct {
topoServer *topo.Server

// streamHealthMutex protects all the following fields
streamHealthMutex sync.Mutex
streamHealthIndex int
streamHealthMap map[int]chan<- *querypb.StreamHealthResponse
lastStreamHealthResponse *querypb.StreamHealthResponse
streamHealthMutex sync.Mutex
streamHealthIndex int
streamHealthMap map[int]chan<- *querypb.StreamHealthResponse
lastStreamHealthResponse *querypb.StreamHealthResponse
lastStreamHealthExpiration time.Time

// history records changes in state for display on the status page.
// It has its own internal mutex.
Expand Down Expand Up @@ -1823,9 +1822,10 @@ func createSplitQueryAlgorithmObject(
func (tsv *TabletServer) StreamHealth(ctx context.Context, callback func(*querypb.StreamHealthResponse) error) error {
tsv.streamHealthMutex.Lock()
shr := tsv.lastStreamHealthResponse
shrExpiration := tsv.lastStreamHealthExpiration
tsv.streamHealthMutex.Unlock()
// Send current state immediately.
if shr != nil {
if shr != nil && time.Now().Before(shrExpiration) {
if err := callback(shr); err != nil {
if err == io.EOF {
return nil
Expand Down Expand Up @@ -1871,14 +1871,14 @@ func (tsv *TabletServer) streamHealthUnregister(id int) {
}

// BroadcastHealth will broadcast the current health to all listeners
func (tsv *TabletServer) BroadcastHealth(terTimestamp int64, stats *querypb.RealtimeStats) {
func (tsv *TabletServer) BroadcastHealth(terTimestamp int64, stats *querypb.RealtimeStats, maxCache time.Duration) {
tsv.mu.Lock()
target := tsv.target
tsv.mu.Unlock()
shr := &querypb.StreamHealthResponse{
Target: &target,
TabletAlias: &tsv.alias,
Serving: tsv.IsServing(),
Target: &target,
TabletAlias: &tsv.alias,
Serving: tsv.IsServing(),
TabletExternallyReparentedTimestamp: terTimestamp,
RealtimeStats: stats,
}
Expand All @@ -1893,6 +1893,7 @@ func (tsv *TabletServer) BroadcastHealth(terTimestamp int64, stats *querypb.Real
}
}
tsv.lastStreamHealthResponse = shr
tsv.lastStreamHealthExpiration = time.Now().Add(maxCache)
}

// HeartbeatLag returns the current lag as calculated by the heartbeat
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletservermock/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func (tqsc *Controller) SchemaEngine() *schema.Engine {
}

// BroadcastHealth is part of the tabletserver.Controller interface
func (tqsc *Controller) BroadcastHealth(terTimestamp int64, stats *querypb.RealtimeStats) {
func (tqsc *Controller) BroadcastHealth(terTimestamp int64, stats *querypb.RealtimeStats, maxCache time.Duration) {
tqsc.mu.Lock()
defer tqsc.mu.Unlock()

Expand Down