diff --git a/go/vt/topo/consultopo/watch.go b/go/vt/topo/consultopo/watch.go index a0d05999fca..c77b7630c50 100644 --- a/go/vt/topo/consultopo/watch.go +++ b/go/vt/topo/consultopo/watch.go @@ -58,6 +58,12 @@ func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, < go func() { defer close(notifications) + var getCtx context.Context + // Initialize to no-op function to avoid having to check for nil. + cancelGetCtx := func() {} + + defer cancelGetCtx() + for { // Wait/poll until we get a new version. // Get with a WaitIndex and WaitTime will return @@ -69,9 +75,18 @@ func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, < WaitIndex: waitIndex, WaitTime: *watchPollDuration, } - pair, _, err = s.kv.Get(nodePath, opts.WithContext(watchCtx)) + + // Make a new Context for just this one Get() call. + // The server should send us something after WaitTime at the latest. + // If it takes more than 2x that long, assume we've lost contact. + // This essentially uses WaitTime as a heartbeat interval to detect + // a dead connection. + cancelGetCtx() + getCtx, cancelGetCtx = context.WithTimeout(watchCtx, 2*opts.WaitTime) + + pair, _, err = s.kv.Get(nodePath, opts.WithContext(getCtx)) if err != nil { - // Serious error or context cancelled. + // Serious error or context timeout/cancelled. notifications <- &topo.WatchData{ Err: convertError(err, nodePath), }