diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index 0719d90308d..7fde2774bd4 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -53,6 +53,7 @@ import ( "golang.org/x/net/context" "vitess.io/vitess/go/netutil" "vitess.io/vitess/go/stats" + "vitess.io/vitess/go/sync2" "vitess.io/vitess/go/vt/grpcclient" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/topo/topoproto" @@ -204,6 +205,12 @@ func (e *TabletStats) DeepEqual(f *TabletStats) bool { (e.LastError != nil && f.LastError != nil && e.LastError.Error() == f.LastError.Error())) } +// Copy produces a copy of TabletStats. +func (e *TabletStats) Copy() *TabletStats { + ts := *e + return &ts +} + // GetTabletHostPort formats a tablet host port address. func (e TabletStats) GetTabletHostPort() string { vtPort := e.Tablet.PortMap["vt"] @@ -289,49 +296,58 @@ type HealthCheck interface { Close() error } -// healthCheckConn contains details about a tablet. -// It is used internally by HealthCheckImpl to keep all the info -// about a tablet. -type healthCheckConn struct { - // set at construction time - ctx context.Context - cancelFunc context.CancelFunc - - // mu protects all the following fields. - // When locking both mutex from HealthCheck and healthCheckConn, - // HealthCheck.mu goes first. - // Note tabletStats.Tablet and tabletStats.Name are immutable. - mu sync.RWMutex - conn queryservice.QueryService - streamCancelFunc context.CancelFunc - tabletStats TabletStats - loggedServingState bool - lastResponseTimestamp time.Time // timestamp of the last healthcheck response -} - // HealthCheckImpl performs health checking and notifies downstream components about any changes. +// It contains a map of tabletHealth objects, each of which stores the health information for +// a tablet. A checkConn goroutine is spawned for each tabletHealth, which is responsible for +// keeping that tabletHealth up-to-date. This is done through callbacks to updateHealth. +// If checkConn terminates for any reason, it updates tabletHealth.Up as false. If a tabletHealth +// gets removed from the map, its cancelFunc gets called, which ensures that the associated +// checkConn goroutine eventually terminates. type HealthCheckImpl struct { // Immutable fields set at construction time. listener HealthCheckStatsListener sendDownEvents bool retryDelay time.Duration healthCheckTimeout time.Duration - closeChan chan struct{} // signals the process gorouting to terminate - // wg keeps track of all launched Go routines. - wg sync.WaitGroup + // connsWG keeps track of all launched Go routines that monitor tablet connections. + connsWG sync.WaitGroup // mu protects all the following fields. - // When locking both mutex from HealthCheck and healthCheckConn, - // HealthCheck.mu goes first. - mu sync.RWMutex + mu sync.Mutex - // addrToConns maps from address to the healthCheckConn object. - addrToConns map[string]*healthCheckConn + // addrToHealth maps from address to tabletHealth. + addrToHealth map[string]*tabletHealth // Wait group that's used to wait until all initial StatsUpdate() calls are made after the AddTablet() calls. initialUpdatesWG sync.WaitGroup } +// healthCheckConn is a structure that lives within the scope of +// the checkConn goroutine to maintain its internal state. Therefore, +// it does not require synchronization. Changes that are relevant to +// healthcheck are transmitted through calls to HealthCheckImpl.updateHealth. +// TODO(sougou): move this and associated functions to a separate file. +type healthCheckConn struct { + ctx context.Context + + conn queryservice.QueryService + tabletStats TabletStats + loggedServingState bool + lastResponseTimestamp time.Time // timestamp of the last healthcheck response +} + +// tabletHealth maintains the health status of a tablet. A map of this +// structure is maintained in HealthCheckImpl. +type tabletHealth struct { + // cancelFunc must be called before discarding tabletHealth. + // This will ensure that the associated checkConn goroutine will terminate. + cancelFunc context.CancelFunc + // conn is the connection associated with the tablet. + conn queryservice.QueryService + // latestTabletStats stores the latest health stats of the tablet. + latestTabletStats TabletStats +} + // NewDefaultHealthCheck creates a new HealthCheck object with a default configuration. func NewDefaultHealthCheck() HealthCheck { return NewHealthCheck(DefaultHealthCheckRetryDelay, DefaultHealthCheckTimeout) @@ -348,37 +364,11 @@ func NewDefaultHealthCheck() HealthCheck { // not healthy. func NewHealthCheck(retryDelay, healthCheckTimeout time.Duration) HealthCheck { hc := &HealthCheckImpl{ - addrToConns: make(map[string]*healthCheckConn), + addrToHealth: make(map[string]*tabletHealth), retryDelay: retryDelay, healthCheckTimeout: healthCheckTimeout, - closeChan: make(chan struct{}), } - hc.wg.Add(1) - go func() { - defer hc.wg.Done() - // Start another go routine to check timeout. - // Currently vttablet sends healthcheck response every 20 seconds. - // We set the default timeout to 1 minute (20s * 3), - // and also perform the timeout check in sync with vttablet frequency. - // When we change the healthcheck frequency on vttablet, - // we should also adjust here. - t := time.NewTicker(healthCheckTimeout / 3) - defer t.Stop() - for { - select { - case <-hc.closeChan: - return - case _, ok := <-t.C: - if !ok { - // the ticker stoped - return - } - hc.checkHealthCheckTimeout() - } - } - }() - healthcheckOnce.Do(func() { http.Handle("/debug/gateway", hc) }) @@ -418,16 +408,13 @@ func (hc *HealthCheckImpl) ServeHTTP(w http.ResponseWriter, _ *http.Request) { // servingConnStats returns the number of serving tablets per keyspace/shard/tablet type. func (hc *HealthCheckImpl) servingConnStats() map[string]int64 { res := make(map[string]int64) - hc.mu.RLock() - defer hc.mu.RUnlock() - for _, hcc := range hc.addrToConns { - hcc.mu.RLock() - if !hcc.tabletStats.Up || !hcc.tabletStats.Serving || hcc.tabletStats.LastError != nil { - hcc.mu.RUnlock() + hc.mu.Lock() + defer hc.mu.Unlock() + for _, th := range hc.addrToHealth { + if !th.latestTabletStats.Up || !th.latestTabletStats.Serving || th.latestTabletStats.LastError != nil { continue } - key := fmt.Sprintf("%s.%s.%s", hcc.tabletStats.Target.Keyspace, hcc.tabletStats.Target.Shard, topoproto.TabletTypeLString(hcc.tabletStats.Target.TabletType)) - hcc.mu.RUnlock() + key := fmt.Sprintf("%s.%s.%s", th.latestTabletStats.Target.Keyspace, th.latestTabletStats.Target.Shard, topoproto.TabletTypeLString(th.latestTabletStats.Target.TabletType)) res[key]++ } return res @@ -455,59 +442,139 @@ func (hc *HealthCheckImpl) stateChecksum() int64 { return int64(crc32.ChecksumIEEE(buf.Bytes())) } +// updateHealth updates the tabletHealth record and transmits the tablet stats +// to the listener. +func (hc *HealthCheckImpl) updateHealth(ts *TabletStats, conn queryservice.QueryService) { + // Unconditionally send the received update at the end. + defer func() { + if hc.listener != nil { + hc.listener.StatsUpdate(ts) + } + }() + + hc.mu.Lock() + th, ok := hc.addrToHealth[ts.Key] + if !ok { + // This can happen on delete because the entry is removed first, + // or if HealthCheckImpl has been closed. + hc.mu.Unlock() + return + } + oldts := th.latestTabletStats + th.latestTabletStats = *ts + th.conn = conn + hc.mu.Unlock() + + // In the case where a tablet changes type (but not for the + // initial message), we want to log it, and maybe advertise it too. + if oldts.Target.TabletType != topodatapb.TabletType_UNKNOWN && oldts.Target.TabletType != ts.Target.TabletType { + // Log and maybe notify + log.Infof("HealthCheckUpdate(Type Change): %v, tablet: %s, target %+v => %+v, reparent time: %v", + oldts.Name, topotools.TabletIdent(oldts.Tablet), topotools.TargetIdent(oldts.Target), topotools.TargetIdent(ts.Target), ts.TabletExternallyReparentedTimestamp) + if hc.listener != nil && hc.sendDownEvents { + oldts.Up = false + hc.listener.StatsUpdate(&oldts) + } + + // Track how often a tablet gets promoted to master. It is used for + // comparing against the variables in go/vtgate/buffer/variables.go. + if oldts.Target.TabletType != topodatapb.TabletType_MASTER && ts.Target.TabletType == topodatapb.TabletType_MASTER { + hcMasterPromotedCounters.Add([]string{ts.Target.Keyspace, ts.Target.Shard}, 1) + } + } +} + // finalizeConn closes the health checking connection and sends the final // notification about the tablet to downstream. To be called only on exit from // checkConn(). func (hc *HealthCheckImpl) finalizeConn(hcc *healthCheckConn) { - hcc.mu.Lock() - hccConn := hcc.conn - hccCtx := hcc.ctx - hcc.conn = nil hcc.tabletStats.Up = false hcc.setServingState(false, "finalizeConn closing connection") // Note: checkConn() exits only when hcc.ctx.Done() is closed. Thus it's // safe to simply get Err() value here and assign to LastError. hcc.tabletStats.LastError = hcc.ctx.Err() - ts := hcc.tabletStats - hcc.mu.Unlock() - - if hccConn != nil { - hccConn.Close(hccCtx) - } - - if hc.listener != nil { - hc.listener.StatsUpdate(&ts) + hc.updateHealth(hcc.tabletStats.Copy(), nil) + if hcc.conn != nil { + // Don't use hcc.ctx because it's already closed. + // Use a separate context, and add a timeout to prevent unbounded waits. + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + hcc.conn.Close(ctx) + hcc.conn = nil } } // checkConn performs health checking on the given tablet. func (hc *HealthCheckImpl) checkConn(hcc *healthCheckConn, name string) { - defer hc.wg.Done() + defer hc.connsWG.Done() defer hc.finalizeConn(hcc) // Initial notification for downstream about the tablet existence. - hcc.mu.Lock() - ts := hcc.tabletStats - hcc.mu.Unlock() - if hc.listener != nil { - hc.listener.StatsUpdate(&ts) - } + hc.updateHealth(hcc.tabletStats.Copy(), hcc.conn) hc.initialUpdatesWG.Done() retryDelay := hc.retryDelay for { - ctx, cancel := context.WithCancel(hcc.ctx) - hcc.mu.Lock() - hcc.streamCancelFunc = cancel - hcc.mu.Unlock() + streamCtx, streamCancel := context.WithCancel(hcc.ctx) + + // Setup a watcher that restarts the timer every time an update is received. + // If a timeout occurs for a serving tablet, we make it non-serving and send + // a status update. The stream is also terminated so it can be retried. + // servingStatus feeds into the serving var, which keeps track of the serving + // status transmitted by the tablet. + servingStatus := make(chan bool, 1) + // timedout is accessed atomically because there could be a race + // between the goroutine that sets it and the check for its value + // later. + timedout := sync2.NewAtomicBool(false) + serving := hcc.tabletStats.Serving + go func() { + for { + select { + case serving = <-servingStatus: + continue + case <-time.After(hc.healthCheckTimeout): + // Ignore if not serving. + if !serving { + continue + } + timedout.Set(true) + streamCancel() + return + case <-streamCtx.Done(): + // If stream returns while serving is false, the function + // will get stuck in an infinite loop. This code path + // breaks the loop. + return + } + } + }() // Read stream health responses. - hcc.stream(ctx, hc, func(shr *querypb.StreamHealthResponse) error { + hcc.stream(streamCtx, hc, func(shr *querypb.StreamHealthResponse) error { // We received a message. Reset the back-off. retryDelay = hc.retryDelay + // Don't block on send to avoid deadlocks. + select { + case servingStatus <- shr.Serving: + default: + } return hcc.processResponse(hc, shr) }) + // streamCancel to make sure the watcher goroutine terminates. + streamCancel() + + // If there was a timeout send an error. We do this after stream has returned. + // This will ensure that this update prevails over any previous message that + // stream could have sent. + if timedout.Get() { + hcc.tabletStats.LastError = fmt.Errorf("healthcheck timed out (latest %v)", hcc.lastResponseTimestamp) + hcc.setServingState(false, hcc.tabletStats.LastError.Error()) + hc.updateHealth(hcc.tabletStats.Copy(), hcc.conn) + hcErrorCounters.Add([]string{hcc.tabletStats.Target.Keyspace, hcc.tabletStats.Target.Shard, topoproto.TabletTypeLString(hcc.tabletStats.Target.TabletType)}, 1) + } + // Streaming RPC failed e.g. because vttablet was restarted or took too long. // Sleep until the next retry is up or the context is done/canceled. select { @@ -548,41 +615,24 @@ func (hcc *healthCheckConn) setServingState(serving bool, reason string) { // stream streams healthcheck responses to callback. func (hcc *healthCheckConn) stream(ctx context.Context, hc *HealthCheckImpl, callback func(*querypb.StreamHealthResponse) error) { - hcc.mu.Lock() - conn := hcc.conn - hcc.mu.Unlock() - - if conn == nil { - var err error - conn, err = tabletconn.GetDialer()(hcc.tabletStats.Tablet, grpcclient.FailFast(true)) + if hcc.conn == nil { + conn, err := tabletconn.GetDialer()(hcc.tabletStats.Tablet, grpcclient.FailFast(true)) if err != nil { - hcc.mu.Lock() hcc.tabletStats.LastError = err - hcc.mu.Unlock() return } - - hcc.mu.Lock() hcc.conn = conn hcc.tabletStats.LastError = nil - hcc.mu.Unlock() } - if err := conn.StreamHealth(ctx, callback); err != nil { - hcc.mu.Lock() - hcc.conn = nil + if err := hcc.conn.StreamHealth(ctx, callback); err != nil { hcc.setServingState(false, err.Error()) hcc.tabletStats.LastError = err - ts := hcc.tabletStats - hcc.mu.Unlock() - // notify downstream for serving status change - if hc.listener != nil { - hc.listener.StatsUpdate(&ts) - } - conn.Close(ctx) - return + // Send nil because we intend to close the connection. + hc.updateHealth(hcc.tabletStats.Copy(), nil) + hcc.conn.Close(ctx) + hcc.conn = nil } - return } // processResponse reads one health check response, and notifies HealthCheckStatsListener. @@ -598,10 +648,6 @@ func (hcc *healthCheckConn) processResponse(hc *HealthCheckImpl, shr *querypb.St return fmt.Errorf("health stats is not valid: %v", shr) } - hcc.mu.RLock() - oldTs := hcc.tabletStats - hcc.mu.RUnlock() - // an app-level error from tablet, force serving state. var healthErr error serving := shr.Serving @@ -610,28 +656,10 @@ func (hcc *healthCheckConn) processResponse(hc *HealthCheckImpl, shr *querypb.St serving = false } - // oldTs.Tablet.Alias.Uid may be 0 because the youtube internal mechanism uses a different + // hcc.TabletStats.Tablet.Alias.Uid may be 0 because the youtube internal mechanism uses a different // code path to initialize this value. If so, we should skip this check. - if shr.TabletAlias != nil && oldTs.Tablet.Alias.Uid != 0 && !proto.Equal(shr.TabletAlias, oldTs.Tablet.Alias) { - return fmt.Errorf("health stats mismatch, tablet %+v alias does not match response alias %v", oldTs.Tablet, shr.TabletAlias) - } - - // In the case where a tablet changes type (but not for the - // initial message), we want to log it, and maybe advertise it too. - if hcc.tabletStats.Target.TabletType != topodatapb.TabletType_UNKNOWN && hcc.tabletStats.Target.TabletType != shr.Target.TabletType { - // Log and maybe notify - log.Infof("HealthCheckUpdate(Type Change): %v, tablet: %s, target %+v => %+v, reparent time: %v", - oldTs.Name, topotools.TabletIdent(oldTs.Tablet), topotools.TargetIdent(oldTs.Target), topotools.TargetIdent(shr.Target), shr.TabletExternallyReparentedTimestamp) - if hc.listener != nil && hc.sendDownEvents { - oldTs.Up = false - hc.listener.StatsUpdate(&oldTs) - } - - // Track how often a tablet gets promoted to master. It is used for - // comparing against the variables in go/vtgate/buffer/variables.go. - if oldTs.Target.TabletType != topodatapb.TabletType_MASTER && shr.Target.TabletType == topodatapb.TabletType_MASTER { - hcMasterPromotedCounters.Add([]string{shr.Target.Keyspace, shr.Target.Shard}, 1) - } + if shr.TabletAlias != nil && hcc.tabletStats.Tablet.Alias.Uid != 0 && !proto.Equal(shr.TabletAlias, hcc.tabletStats.Tablet.Alias) { + return fmt.Errorf("health stats mismatch, tablet %+v alias does not match response alias %v", hcc.tabletStats.Tablet, shr.TabletAlias) } // In this case where a new tablet is initialized or a tablet type changes, we want to @@ -642,18 +670,6 @@ func (hcc *healthCheckConn) processResponse(hc *HealthCheckImpl, shr *querypb.St // Update our record, and notify downstream for tabletType and // realtimeStats change. - ts := hcc.update(shr, serving, healthErr) - if hc.listener != nil { - hc.listener.StatsUpdate(&ts) - } - return nil -} - -// update updates the stats of a healthCheckConn, and returns a copy -// of its tabletStats. -func (hcc *healthCheckConn) update(shr *querypb.StreamHealthResponse, serving bool, healthErr error) TabletStats { - hcc.mu.Lock() - defer hcc.mu.Unlock() hcc.lastResponseTimestamp = time.Now() hcc.tabletStats.Target = shr.Target hcc.tabletStats.TabletExternallyReparentedTimestamp = shr.TabletExternallyReparentedTimestamp @@ -664,55 +680,8 @@ func (hcc *healthCheckConn) update(shr *querypb.StreamHealthResponse, serving bo reason = "healthCheck update error: " + healthErr.Error() } hcc.setServingState(serving, reason) - return hcc.tabletStats -} - -func (hc *HealthCheckImpl) checkHealthCheckTimeout() { - hc.mu.RLock() - list := make([]*healthCheckConn, 0, len(hc.addrToConns)) - for _, hcc := range hc.addrToConns { - list = append(list, hcc) - } - hc.mu.RUnlock() - for _, hcc := range list { - hcc.mu.RLock() - if !hcc.tabletStats.Serving { - // ignore non-serving tablet - hcc.mu.RUnlock() - continue - } - if time.Now().Sub(hcc.lastResponseTimestamp) < hc.healthCheckTimeout { - // received a healthcheck response recently - hcc.mu.RUnlock() - continue - } - hcc.mu.RUnlock() - // mark the tablet non-serving as we have not seen a health check response for a long time - hcc.mu.Lock() - // check again to avoid race condition - if !hcc.tabletStats.Serving { - // ignore non-serving tablet - hcc.mu.Unlock() - continue - } - if time.Now().Sub(hcc.lastResponseTimestamp) < hc.healthCheckTimeout { - // received a healthcheck response recently - hcc.mu.Unlock() - continue - } - - //Timeout detected. Cancel the current streaming RPC and let checkConn() restart it. - hcc.streamCancelFunc() - hcc.tabletStats.LastError = fmt.Errorf("healthcheck timed out (latest %v)", hcc.lastResponseTimestamp) - hcc.setServingState(false, hcc.tabletStats.LastError.Error()) - ts := hcc.tabletStats - hcc.mu.Unlock() - // notify downstream for serving status change - if hc.listener != nil { - hc.listener.StatsUpdate(&ts) - } - hcErrorCounters.Add([]string{ts.Target.Keyspace, ts.Target.Shard, topoproto.TabletTypeLString(ts.Target.TabletType)}, 1) - } + hc.updateHealth(hcc.tabletStats.Copy(), hcc.conn) + return nil } func (hc *HealthCheckImpl) deleteConn(tablet *topodatapb.Tablet) { @@ -720,16 +689,13 @@ func (hc *HealthCheckImpl) deleteConn(tablet *topodatapb.Tablet) { defer hc.mu.Unlock() key := TabletToMapKey(tablet) - hcc, ok := hc.addrToConns[key] + th, ok := hc.addrToHealth[key] if !ok { - log.Warningf("deleting unknown tablet: %+v", tablet) return } - hcc.mu.Lock() - hcc.tabletStats.Up = false - hcc.mu.Unlock() - hcc.cancelFunc() - delete(hc.addrToConns, key) + th.latestTabletStats.Up = false + th.cancelFunc() + delete(hc.addrToHealth, key) } // SetListener sets the listener for healthcheck updates. @@ -742,7 +708,7 @@ func (hc *HealthCheckImpl) SetListener(listener HealthCheckStatsListener, sendDo hc.mu.Lock() defer hc.mu.Unlock() - if len(hc.addrToConns) > 0 { + if len(hc.addrToHealth) > 0 { panic("must not call SetListener after tablets were added") } @@ -757,8 +723,7 @@ func (hc *HealthCheckImpl) AddTablet(tablet *topodatapb.Tablet, name string) { ctx, cancelFunc := context.WithCancel(context.Background()) key := TabletToMapKey(tablet) hcc := &healthCheckConn{ - ctx: ctx, - cancelFunc: cancelFunc, + ctx: ctx, tabletStats: TabletStats{ Key: key, Tablet: tablet, @@ -768,16 +733,24 @@ func (hc *HealthCheckImpl) AddTablet(tablet *topodatapb.Tablet, name string) { }, } hc.mu.Lock() - if _, ok := hc.addrToConns[key]; ok { + if hc.addrToHealth == nil { + // already closed. + hc.mu.Unlock() + return + } + if _, ok := hc.addrToHealth[key]; ok { hc.mu.Unlock() log.Warningf("adding duplicate tablet %v for %v: %+v", name, tablet.Alias.Cell, tablet) return } - hc.addrToConns[key] = hcc + hc.addrToHealth[key] = &tabletHealth{ + cancelFunc: cancelFunc, + latestTabletStats: hcc.tabletStats, + } hc.initialUpdatesWG.Add(1) + hc.connsWG.Add(1) hc.mu.Unlock() - hc.wg.Add(1) go hc.checkConn(hcc, name) } @@ -803,16 +776,14 @@ func (hc *HealthCheckImpl) WaitForInitialStatsUpdates() { // GetConnection returns the TabletConn of the given tablet. func (hc *HealthCheckImpl) GetConnection(key string) queryservice.QueryService { - hc.mu.RLock() - hcc := hc.addrToConns[key] - if hcc == nil { - hc.mu.RUnlock() + hc.mu.Lock() + defer hc.mu.Unlock() + + th := hc.addrToHealth[key] + if th == nil { return nil } - hc.mu.RUnlock() - hcc.mu.RLock() - defer hcc.mu.RUnlock() - return hcc.conn + return th.conn } // TabletsCacheStatus is the current tablets for a cell/target. @@ -912,22 +883,20 @@ func (hc *HealthCheckImpl) CacheStatus() TabletsCacheStatusList { func (hc *HealthCheckImpl) cacheStatusMap() map[string]*TabletsCacheStatus { tcsMap := make(map[string]*TabletsCacheStatus) - hc.mu.RLock() - defer hc.mu.RUnlock() - for _, hcc := range hc.addrToConns { - hcc.mu.RLock() - key := fmt.Sprintf("%v.%v.%v.%v", hcc.tabletStats.Tablet.Alias.Cell, hcc.tabletStats.Target.Keyspace, hcc.tabletStats.Target.Shard, hcc.tabletStats.Target.TabletType.String()) + hc.mu.Lock() + defer hc.mu.Unlock() + for _, th := range hc.addrToHealth { + key := fmt.Sprintf("%v.%v.%v.%v", th.latestTabletStats.Tablet.Alias.Cell, th.latestTabletStats.Target.Keyspace, th.latestTabletStats.Target.Shard, th.latestTabletStats.Target.TabletType.String()) var tcs *TabletsCacheStatus var ok bool if tcs, ok = tcsMap[key]; !ok { tcs = &TabletsCacheStatus{ - Cell: hcc.tabletStats.Tablet.Alias.Cell, - Target: hcc.tabletStats.Target, + Cell: th.latestTabletStats.Tablet.Alias.Cell, + Target: th.latestTabletStats.Target, } tcsMap[key] = tcs } - stats := hcc.tabletStats - hcc.mu.RUnlock() + stats := th.latestTabletStats tcs.TabletsStats = append(tcs.TabletsStats, &stats) } return tcsMap @@ -938,18 +907,17 @@ func (hc *HealthCheckImpl) cacheStatusMap() map[string]*TabletsCacheStatus { // currently executing and won't be called again. func (hc *HealthCheckImpl) Close() error { hc.mu.Lock() - close(hc.closeChan) - for _, hcc := range hc.addrToConns { - hcc.cancelFunc() + for _, th := range hc.addrToHealth { + th.cancelFunc() } - hc.addrToConns = make(map[string]*healthCheckConn) + hc.addrToHealth = nil // Release the lock early or a pending checkHealthCheckTimeout // cannot get a read lock on it. hc.mu.Unlock() // Wait for the checkHealthCheckTimeout Go routine and each Go // routine per tablet. - hc.wg.Wait() + hc.connsWG.Wait() return nil } diff --git a/go/vt/discovery/healthcheck_test.go b/go/vt/discovery/healthcheck_test.go index 1bb7d570145..616a15dfdff 100644 --- a/go/vt/discovery/healthcheck_test.go +++ b/go/vt/discovery/healthcheck_test.go @@ -545,6 +545,25 @@ func TestHealthCheckTimeout(t *testing.T) { t.Errorf("StreamHealth should be canceled after timeout, but is not") } + // repeat the wait. There should be no error or cancelation. + fc.resetCanceledFlag() + time.Sleep(2 * timeout) + t.Logf(`Sleep(2 * timeout)`) + + select { + case res = <-l.output: + t.Errorf(`<-l.output: %+v; want not message`, res) + default: + } + + if err := checkErrorCounter("k", "s", topodatapb.TabletType_MASTER, 1); err != nil { + t.Errorf("%v", err) + } + + if fc.isCanceled() { + t.Errorf("StreamHealth should not be canceled after timeout") + } + // send a healthcheck response, it should be serving again input <- shr t.Logf(`input <- {{Keyspace: "k", Shard: "s", TabletType: MASTER}, Serving: true, TabletExternallyReparentedTimestamp: 10, {SecondsBehindMaster: 1, CpuUsage: 0.2}}`) @@ -694,6 +713,12 @@ func (fc *fakeConn) isCanceled() bool { return fc.canceled } +func (fc *fakeConn) resetCanceledFlag() { + fc.mu.Lock() + defer fc.mu.Unlock() + fc.canceled = false +} + func checkErrorCounter(keyspace, shard string, tabletType topodatapb.TabletType, want int64) error { statsKey := []string{keyspace, shard, topoproto.TabletTypeLString(tabletType)} name := strings.Join(statsKey, ".") diff --git a/go/vt/worker/split_clone_test.go b/go/vt/worker/split_clone_test.go index 184e87d1dc1..993771b2232 100644 --- a/go/vt/worker/split_clone_test.go +++ b/go/vt/worker/split_clone_test.go @@ -1019,7 +1019,7 @@ func TestSplitCloneV2_NoMasterAvailable(t *testing.T) { select { case <-ctx.Done(): - t.Fatalf("timed out waiting for vtworker to retry due to NoMasterAvailable: %v", ctx.Err()) + panic(fmt.Errorf("timed out waiting for vtworker to retry due to NoMasterAvailable: %v", ctx.Err())) case <-time.After(10 * time.Millisecond): // Poll constantly. }