diff --git a/go/vt/vtctld/tablet_data_test.go b/go/vt/vtctld/tablet_data_test.go index 153bc68fb11..edc69d985a1 100644 --- a/go/vt/vtctld/tablet_data_test.go +++ b/go/vt/vtctld/tablet_data_test.go @@ -22,10 +22,11 @@ import ( "testing" "time" - "golang.org/x/net/context" - "github.com/golang/protobuf/proto" + "golang.org/x/net/context" "vitess.io/vitess/go/vt/logutil" + querypb "vitess.io/vitess/go/vt/proto/query" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/topo/memorytopo" "vitess.io/vitess/go/vt/vttablet/grpcqueryservice" "vitess.io/vitess/go/vt/vttablet/queryservice" @@ -33,9 +34,6 @@ import ( "vitess.io/vitess/go/vt/vttablet/tmclient" "vitess.io/vitess/go/vt/wrangler" "vitess.io/vitess/go/vt/wrangler/testlib" - - querypb "vitess.io/vitess/go/vt/proto/query" - topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) // streamHealthTabletServer is a local QueryService implementation to support the tests @@ -91,7 +89,7 @@ func (s *streamHealthTabletServer) streamHealthUnregister(id int) error { } // BroadcastHealth will broadcast the current health to all listeners -func (s *streamHealthTabletServer) BroadcastHealth(terTimestamp int64, stats *querypb.RealtimeStats) { +func (s *streamHealthTabletServer) BroadcastHealth(terTimestamp int64, stats *querypb.RealtimeStats, maxCache time.Duration) { shr := &querypb.StreamHealthResponse{ TabletExternallyReparentedTimestamp: terTimestamp, RealtimeStats: stats, @@ -137,7 +135,7 @@ func TestTabletData(t *testing.T) { case <-stop: return default: - shsq.BroadcastHealth(42, stats) + shsq.BroadcastHealth(42, stats, time.Minute) } } }() diff --git a/go/vt/vttablet/endtoend/misc_test.go b/go/vt/vttablet/endtoend/misc_test.go index 4544ac0bc31..2f12bf0a063 100644 --- a/go/vt/vttablet/endtoend/misc_test.go +++ b/go/vt/vttablet/endtoend/misc_test.go @@ -29,17 +29,15 @@ import ( "github.com/golang/protobuf/proto" "golang.org/x/net/context" - "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/callerid" "vitess.io/vitess/go/vt/log" + querypb "vitess.io/vitess/go/vt/proto/query" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vttablet/endtoend/framework" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" - - querypb "vitess.io/vitess/go/vt/proto/query" - vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) func TestSimpleRead(t *testing.T) { @@ -453,7 +451,7 @@ func TestHealth(t *testing.T) { func TestStreamHealth(t *testing.T) { var health *querypb.StreamHealthResponse - framework.Server.BroadcastHealth(0, nil) + framework.Server.BroadcastHealth(0, nil, time.Minute) if err := framework.Server.StreamHealth(context.Background(), func(shr *querypb.StreamHealthResponse) error { health = shr return io.EOF @@ -465,6 +463,23 @@ func TestStreamHealth(t *testing.T) { } } +func TestStreamHealth_Expired(t *testing.T) { + var health *querypb.StreamHealthResponse + framework.Server.BroadcastHealth(0, nil, time.Millisecond) + time.Sleep(5 * time.Millisecond) + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond * 100) + defer cancel() + if err := framework.Server.StreamHealth(ctx, func(shr *querypb.StreamHealthResponse) error { + health = shr + return io.EOF + }); err != nil { + t.Fatal(err) + } + if health != nil { + t.Errorf("Health: %v, want %v", health, nil) + } +} + func TestQueryStats(t *testing.T) { client := framework.NewClient() vstart := framework.DebugVars() diff --git a/go/vt/vttablet/tabletmanager/state_change.go b/go/vt/vttablet/tabletmanager/state_change.go index e2fa8dd6838..2e6a1a4076f 100644 --- a/go/vt/vttablet/tabletmanager/state_change.go +++ b/go/vt/vttablet/tabletmanager/state_change.go @@ -24,24 +24,21 @@ import ( "strings" "time" - "vitess.io/vitess/go/vt/vterrors" - "golang.org/x/net/context" - "vitess.io/vitess/go/event" "vitess.io/vitess/go/trace" "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/mysqlctl" + querypb "vitess.io/vitess/go/vt/proto/query" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" + "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vttablet/tabletmanager/events" "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" "vitess.io/vitess/go/vt/vttablet/tabletserver/rules" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" - - querypb "vitess.io/vitess/go/vt/proto/query" - topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) var ( @@ -105,6 +102,7 @@ func (agent *ActionAgent) broadcastHealth() { replicationDelay := agent._replicationDelay healthError := agent._healthy terTime := agent._tabletExternallyReparentedTime + healthyTime := agent._healthyTime agent.mutex.Unlock() // send it to our observers @@ -116,12 +114,17 @@ func (agent *ActionAgent) broadcastHealth() { stats.Qps = tabletenv.QPSRates.TotalRate() if healthError != nil { stats.HealthError = healthError.Error() + } else { + timeSinceLastCheck := time.Since(healthyTime) + if timeSinceLastCheck > *healthCheckInterval*3 { + stats.HealthError = fmt.Sprintf("last health check is too old: %s > %s", timeSinceLastCheck, *healthCheckInterval*3) + } } var ts int64 if !terTime.IsZero() { ts = terTime.Unix() } - go agent.QueryServiceControl.BroadcastHealth(ts, stats) + go agent.QueryServiceControl.BroadcastHealth(ts, stats, *healthCheckInterval*3) } // refreshTablet needs to be run after an action may have changed the current diff --git a/go/vt/vttablet/tabletserver/controller.go b/go/vt/vttablet/tabletserver/controller.go index afffe54b661..2d7a8c10fce 100644 --- a/go/vt/vttablet/tabletserver/controller.go +++ b/go/vt/vttablet/tabletserver/controller.go @@ -76,7 +76,7 @@ type Controller interface { SchemaEngine() *schema.Engine // BroadcastHealth sends the current health to all listeners - BroadcastHealth(terTimestamp int64, stats *querypb.RealtimeStats) + BroadcastHealth(terTimestamp int64, stats *querypb.RealtimeStats, maxCache time.Duration) // HeartbeatLag returns the current lag as calculated by the heartbeat // package, if heartbeat is enabled. Otherwise returns 0. diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index 491c1cf604a..6993f1b7ebc 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -30,7 +30,6 @@ import ( "time" "golang.org/x/net/context" - "vitess.io/vitess/go/acl" "vitess.io/vitess/go/history" "vitess.io/vitess/go/mysql" @@ -44,6 +43,10 @@ import ( "vitess.io/vitess/go/vt/dbconnpool" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/logutil" + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + querypb "vitess.io/vitess/go/vt/proto/query" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/srvtopo" "vitess.io/vitess/go/vt/tableacl" @@ -61,11 +64,6 @@ import ( "vitess.io/vitess/go/vt/vttablet/tabletserver/txserializer" "vitess.io/vitess/go/vt/vttablet/tabletserver/txthrottler" "vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer" - - binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" - querypb "vitess.io/vitess/go/vt/proto/query" - topodatapb "vitess.io/vitess/go/vt/proto/topodata" - vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) const ( @@ -184,10 +182,11 @@ type TabletServer struct { topoServer *topo.Server // streamHealthMutex protects all the following fields - streamHealthMutex sync.Mutex - streamHealthIndex int - streamHealthMap map[int]chan<- *querypb.StreamHealthResponse - lastStreamHealthResponse *querypb.StreamHealthResponse + streamHealthMutex sync.Mutex + streamHealthIndex int + streamHealthMap map[int]chan<- *querypb.StreamHealthResponse + lastStreamHealthResponse *querypb.StreamHealthResponse + lastStreamHealthExpiration time.Time // history records changes in state for display on the status page. // It has its own internal mutex. @@ -1823,9 +1822,10 @@ func createSplitQueryAlgorithmObject( func (tsv *TabletServer) StreamHealth(ctx context.Context, callback func(*querypb.StreamHealthResponse) error) error { tsv.streamHealthMutex.Lock() shr := tsv.lastStreamHealthResponse + shrExpiration := tsv.lastStreamHealthExpiration tsv.streamHealthMutex.Unlock() // Send current state immediately. - if shr != nil { + if shr != nil && time.Now().Before(shrExpiration) { if err := callback(shr); err != nil { if err == io.EOF { return nil @@ -1871,14 +1871,14 @@ func (tsv *TabletServer) streamHealthUnregister(id int) { } // BroadcastHealth will broadcast the current health to all listeners -func (tsv *TabletServer) BroadcastHealth(terTimestamp int64, stats *querypb.RealtimeStats) { +func (tsv *TabletServer) BroadcastHealth(terTimestamp int64, stats *querypb.RealtimeStats, maxCache time.Duration) { tsv.mu.Lock() target := tsv.target tsv.mu.Unlock() shr := &querypb.StreamHealthResponse{ - Target: &target, - TabletAlias: &tsv.alias, - Serving: tsv.IsServing(), + Target: &target, + TabletAlias: &tsv.alias, + Serving: tsv.IsServing(), TabletExternallyReparentedTimestamp: terTimestamp, RealtimeStats: stats, } @@ -1893,6 +1893,7 @@ func (tsv *TabletServer) BroadcastHealth(terTimestamp int64, stats *querypb.Real } } tsv.lastStreamHealthResponse = shr + tsv.lastStreamHealthExpiration = time.Now().Add(maxCache) } // HeartbeatLag returns the current lag as calculated by the heartbeat diff --git a/go/vt/vttablet/tabletservermock/controller.go b/go/vt/vttablet/tabletservermock/controller.go index d5e906240d3..f6f7ee99649 100644 --- a/go/vt/vttablet/tabletservermock/controller.go +++ b/go/vt/vttablet/tabletservermock/controller.go @@ -187,7 +187,7 @@ func (tqsc *Controller) SchemaEngine() *schema.Engine { } // BroadcastHealth is part of the tabletserver.Controller interface -func (tqsc *Controller) BroadcastHealth(terTimestamp int64, stats *querypb.RealtimeStats) { +func (tqsc *Controller) BroadcastHealth(terTimestamp int64, stats *querypb.RealtimeStats, maxCache time.Duration) { tqsc.mu.Lock() defer tqsc.mu.Unlock()