Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 19 additions & 18 deletions go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,11 @@ func (hc *HealthCheckImpl) deleteTablet(tablet *topodata.Tablet) {
return
}
delete(ths, tabletAlias)
// delete from healthy list
healthy, ok := hc.healthy[key]
if ok && len(healthy) > 0 {
hc.recomputeHealthy(key)
}
}

func (hc *HealthCheckImpl) updateHealth(th *TabletHealth, shr *query.StreamHealthResponse, currentTarget *query.Target, trivialNonMasterUpdate bool, isMasterUpdate bool, isMasterChange bool) {
Expand Down Expand Up @@ -446,27 +451,11 @@ func (hc *HealthCheckImpl) updateHealth(th *TabletHealth, shr *query.StreamHealt
// Tablets from other cells for non-master targets should not trigger a re-sort;
// they should also be excluded from healthy list.
if shr.Target.TabletType != topodata.TabletType_MASTER && hc.isIncluded(shr.Target.TabletType, shr.TabletAlias) {
all := hc.healthData[targetKey]
allArray := make([]*TabletHealth, 0, len(all))
for _, s := range all {
// Only tablets in same cell / cellAlias are included in healthy list.
if hc.isIncluded(s.Tablet.Type, s.Tablet.Alias) {
allArray = append(allArray, s)
}
}
hc.healthy[targetKey] = FilterStatsByReplicationLag(allArray)
hc.recomputeHealthy(targetKey)
}
if targetChanged && currentTarget.TabletType != topodata.TabletType_MASTER && hc.isIncluded(shr.Target.TabletType, shr.TabletAlias) { // also recompute old target's healthy list
oldTargetKey := hc.keyFromTarget(currentTarget)
all := hc.healthData[oldTargetKey]
allArray := make([]*TabletHealth, 0, len(all))
for _, s := range all {
// Only tablets in same cell / cellAlias are included in healthy list.
if hc.isIncluded(s.Tablet.Type, s.Tablet.Alias) {
allArray = append(allArray, s)
}
}
hc.healthy[oldTargetKey] = FilterStatsByReplicationLag(allArray)
hc.recomputeHealthy(oldTargetKey)
}
}
if isMasterChange {
Expand All @@ -478,6 +467,18 @@ func (hc *HealthCheckImpl) updateHealth(th *TabletHealth, shr *query.StreamHealt

}

func (hc *HealthCheckImpl) recomputeHealthy(key keyspaceShardTabletType) {
all := hc.healthData[key]
allArray := make([]*TabletHealth, 0, len(all))
for _, s := range all {
// Only tablets in same cell / cellAlias are included in healthy list.
if hc.isIncluded(s.Tablet.Type, s.Tablet.Alias) {
allArray = append(allArray, s)
}
}
hc.healthy[key] = FilterStatsByReplicationLag(allArray)
}

// Subscribe adds a listener. Used by vtgate buffer to learn about master changes.
func (hc *HealthCheckImpl) Subscribe() chan *TabletHealth {
hc.subMu.Lock()
Expand Down
42 changes: 42 additions & 0 deletions go/vt/discovery/healthcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,48 @@ func TestWaitForAllServingTablets(t *testing.T) {
KeyspacesToWatch = []string{}
}

// TestRemoveTablet tests the behavior when a tablet goes away.
func TestRemoveTablet(t *testing.T) {
ts := memorytopo.NewServer("cell")
hc := createTestHc(ts)
defer hc.Close()
tablet := createTestTablet(0, "cell", "a")
tablet.Type = topodatapb.TabletType_REPLICA
input := make(chan *querypb.StreamHealthResponse)
createFakeConn(tablet, input)

// create a channel and subscribe to healthcheck
resultChan := hc.Subscribe()
hc.AddTablet(tablet)
// there will be a first result, get and discard it
<-resultChan

shr := &querypb.StreamHealthResponse{
TabletAlias: tablet.Alias,
Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA},
Serving: true,
TabletExternallyReparentedTimestamp: 0,
RealtimeStats: &querypb.RealtimeStats{SecondsBehindMaster: 1, CpuUsage: 0.2},
}
want := []*TabletHealth{{
Tablet: tablet,
Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA},
Serving: true,
Stats: &querypb.RealtimeStats{SecondsBehindMaster: 1, CpuUsage: 0.2},
MasterTermStartTime: 0,
}}
input <- shr
<-resultChan
// check it's there
a := hc.GetHealthyTabletStats(&querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA})
mustMatch(t, want, a, "unexpected result")

// delete the tablet
hc.RemoveTablet(tablet)
a = hc.GetHealthyTabletStats(&querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA})
assert.Empty(t, a, "wrong result, expected empty list")
}

// TestGetHealthyTablets tests the functionality of GetHealthyTabletStats.
func TestGetHealthyTablets(t *testing.T) {
ts := memorytopo.NewServer("cell")
Expand Down