Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions go/vt/topo/consultopo/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, <
go func() {
defer close(notifications)

var getCtx context.Context
// Initialize to no-op function to avoid having to check for nil.
cancelGetCtx := func() {}

defer cancelGetCtx()

for {
// Wait/poll until we get a new version.
// Get with a WaitIndex and WaitTime will return
Expand All @@ -69,9 +75,18 @@ func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, <
WaitIndex: waitIndex,
WaitTime: *watchPollDuration,
}
pair, _, err = s.kv.Get(nodePath, opts.WithContext(watchCtx))

// Make a new Context for just this one Get() call.
// The server should send us something after WaitTime at the latest.
// If it takes more than 2x that long, assume we've lost contact.
// This essentially uses WaitTime as a heartbeat interval to detect
// a dead connection.
cancelGetCtx()
getCtx, cancelGetCtx = context.WithTimeout(watchCtx, 2*opts.WaitTime)

pair, _, err = s.kv.Get(nodePath, opts.WithContext(getCtx))
if err != nil {
// Serious error or context cancelled.
// Serious error or context timeout/cancelled.
notifications <- &topo.WatchData{
Err: convertError(err, nodePath),
}
Expand Down