Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 6 additions & 104 deletions go/vt/discovery/tablet_stats_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package discovery

import (
"math"
"sync"

"golang.org/x/net/context"
Expand All @@ -27,7 +26,6 @@ import (
"vitess.io/vitess/go/vt/srvtopo"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/topotools"
)

// TabletStatsCache is a HealthCheckStatsListener that keeps both the
Expand All @@ -47,8 +45,6 @@ type TabletStatsCache struct {
cell string
// ts is the topo server in use.
ts *topo.Server
// aggregatesChan is used to send notifications to listeners.
aggregatesChan chan []*srvtopo.TargetStatsEntry
// mu protects the following fields. It does not protect individual
// entries in the entries map.
mu sync.RWMutex
Expand All @@ -70,8 +66,6 @@ type tabletStatsCacheEntry struct {
all map[string]*TabletStats
// healthy only has the healthy ones.
healthy []*TabletStats
// aggregates has the per-alias aggregates.
aggregates map[string]*querypb.AggregateStats
}

func (e *tabletStatsCacheEntry) updateHealthyMapForMaster(ts *TabletStats) {
Expand Down Expand Up @@ -131,12 +125,11 @@ func NewTabletStatsCacheDoNotSetListener(ts *topo.Server, cell string) *TabletSt

func newTabletStatsCache(hc HealthCheck, ts *topo.Server, cell string, setListener bool) *TabletStatsCache {
tc := &TabletStatsCache{
cell: cell,
ts: ts,
aggregatesChan: make(chan []*srvtopo.TargetStatsEntry, 100),
entries: make(map[string]map[string]map[topodatapb.TabletType]*tabletStatsCacheEntry),
tsm: srvtopo.NewTargetStatsMultiplexer(),
cellAliases: make(map[string]string),
cell: cell,
ts: ts,
entries: make(map[string]map[string]map[topodatapb.TabletType]*tabletStatsCacheEntry),
tsm: srvtopo.NewTargetStatsMultiplexer(),
cellAliases: make(map[string]string),
}

if setListener {
Expand Down Expand Up @@ -188,8 +181,7 @@ func (tc *TabletStatsCache) getOrCreateEntry(target *querypb.Target) *tabletStat
e, ok := t[target.TabletType]
if !ok {
e = &tabletStatsCacheEntry{
all: make(map[string]*TabletStats),
aggregates: make(map[string]*querypb.AggregateStats),
all: make(map[string]*TabletStats),
}
t[target.TabletType] = e
}
Expand Down Expand Up @@ -258,9 +250,6 @@ func (tc *TabletStatsCache) StatsUpdate(ts *TabletStats) {
// The healthy list is different for TabletType_MASTER: we
// only keep the most recent one.
e.updateHealthyMapForMaster(ts)
for _, s := range e.all {
allArray = append(allArray, s)
}
} else {
// For non-master, if it is a trivial update,
// we just skip everything else. We don't even update the
Expand All @@ -276,45 +265,6 @@ func (tc *TabletStatsCache) StatsUpdate(ts *TabletStats) {
}
e.healthy = FilterByReplicationLag(allArray)
}

tc.updateAggregateMap(ts.Target.Keyspace, ts.Target.Shard, ts.Target.TabletType, e, allArray)
}

// makeAggregateMap takes a list of TabletStats and builds a per-alias
// AggregateStats map.
func (tc *TabletStatsCache) makeAggregateMap(stats []*TabletStats) map[string]*querypb.AggregateStats {
result := make(map[string]*querypb.AggregateStats)
for _, ts := range stats {
alias := tc.getAliasByCell(ts.Tablet.Alias.Cell)
agg, ok := result[alias]
if !ok {
agg = &querypb.AggregateStats{
SecondsBehindMasterMin: math.MaxUint32,
}
result[alias] = agg
}

if ts.Serving && ts.LastError == nil {
agg.HealthyTabletCount++
if ts.Stats.SecondsBehindMaster < agg.SecondsBehindMasterMin {
agg.SecondsBehindMasterMin = ts.Stats.SecondsBehindMaster
}
if ts.Stats.SecondsBehindMaster > agg.SecondsBehindMasterMax {
agg.SecondsBehindMasterMax = ts.Stats.SecondsBehindMaster
}
} else {
agg.UnhealthyTabletCount++
}
}
return result
}

// updateAggregateMap will update the aggregate map for the
// tabletStatsCacheEntry. It may broadcast the changes too if we have listeners.
// e.mu needs to be locked.
func (tc *TabletStatsCache) updateAggregateMap(keyspace, shard string, tabletType topodatapb.TabletType, e *tabletStatsCacheEntry, stats []*TabletStats) {
// Save the new value
e.aggregates = tc.makeAggregateMap(stats)
}

// GetTabletStats returns the full list of available targets.
Expand Down Expand Up @@ -361,53 +311,5 @@ func (tc *TabletStatsCache) ResetForTesting() {
tc.entries = make(map[string]map[string]map[topodatapb.TabletType]*tabletStatsCacheEntry)
}

// GetAggregateStats is part of the TargetStatsListener interface.
func (tc *TabletStatsCache) GetAggregateStats(target *querypb.Target) (*querypb.AggregateStats, error) {
e := tc.getEntry(target.Keyspace, target.Shard, target.TabletType)
if e == nil {
return nil, topo.NewError(topo.NoNode, topotools.TargetIdent(target))
}

e.mu.RLock()
defer e.mu.RUnlock()
if target.TabletType == topodatapb.TabletType_MASTER {
if len(e.aggregates) == 0 {
return nil, topo.NewError(topo.NoNode, topotools.TargetIdent(target))
}
for _, agg := range e.aggregates {
return agg, nil
}
}
targetAlias := tc.getAliasByCell(target.Cell)
agg, ok := e.aggregates[targetAlias]
if !ok {
return nil, topo.NewError(topo.NoNode, topotools.TargetIdent(target))
}
return agg, nil
}

// GetMasterCell is part of the TargetStatsListener interface.
func (tc *TabletStatsCache) GetMasterCell(keyspace, shard string) (cell string, err error) {
e := tc.getEntry(keyspace, shard, topodatapb.TabletType_MASTER)
if e == nil {
return "", topo.NewError(topo.NoNode, topotools.TargetIdent(&querypb.Target{
Keyspace: keyspace,
Shard: shard,
TabletType: topodatapb.TabletType_MASTER,
}))
}

e.mu.RLock()
defer e.mu.RUnlock()
for cell := range e.aggregates {
return cell, nil
}
return "", topo.NewError(topo.NoNode, topotools.TargetIdent(&querypb.Target{
Keyspace: keyspace,
Shard: shard,
TabletType: topodatapb.TabletType_MASTER,
}))
}

// Compile-time interface check.
var _ HealthCheckStatsListener = (*TabletStatsCache)(nil)
Loading