From 5cbeafae4551da720f052fd3f479be11e160fa47 Mon Sep 17 00:00:00 2001 From: Nikita Koptelov Date: Thu, 1 Jun 2017 18:28:20 +0300 Subject: [PATCH] healthcheck uses parent context if available now --- client.go | 34 +++++++++++++++++++++------------- ping.go | 9 ++++++++- request.go | 4 +++- 3 files changed, 32 insertions(+), 15 deletions(-) diff --git a/client.go b/client.go index 73f74888d..068324828 100644 --- a/client.go +++ b/client.go @@ -227,7 +227,9 @@ func NewClient(options ...ClientOptionFunc) (*Client, error) { if c.snifferEnabled { // Sniff the cluster initially - if err := c.sniff(c.snifferTimeoutStartup); err != nil { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + if err := c.sniff(ctx, c.snifferTimeoutStartup); err != nil { return nil, err } } else { @@ -239,7 +241,9 @@ func NewClient(options ...ClientOptionFunc) (*Client, error) { if c.healthcheckEnabled { // Perform an initial health check - c.healthcheck(c.healthcheckTimeoutStartup, true) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + c.healthcheck(ctx, c.healthcheckTimeoutStartup, true) } // Ensure that we have at least one connection available if err := c.mustActiveConn(); err != nil { @@ -684,7 +688,9 @@ func (c *Client) sniffer() { c.snifferStop <- true return case <-ticker.C: - c.sniff(timeout) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + c.sniff(ctx, timeout) } } } @@ -694,7 +700,7 @@ func (c *Client) sniffer() { // by the preceding sniffing process (if sniffing is enabled). // // If sniffing is disabled, this is a no-op. -func (c *Client) sniff(timeout time.Duration) error { +func (c *Client) sniff(ctx context.Context, timeout time.Duration) error { c.mu.RLock() if !c.snifferEnabled { c.mu.RUnlock() @@ -731,7 +737,7 @@ func (c *Client) sniff(timeout time.Duration) error { // Start sniffing on all found URLs ch := make(chan []*conn, len(urls)) for _, url := range urls { - go func(url string) { ch <- c.sniffNode(url) }(url) + go func(url string) { ch <- c.sniffNode(ctx, url) }(url) } // Wait for the results to come back, or the process times out. @@ -757,11 +763,11 @@ var reSniffHostAndPort = regexp.MustCompile(`\/([^:]*):([0-9]+)\]`) // in sniff. If successful, it returns the list of node URLs extracted // from the result of calling Nodes Info API. Otherwise, an empty array // is returned. -func (c *Client) sniffNode(url string) []*conn { +func (c *Client) sniffNode(ctx context.Context, url string) []*conn { nodes := make([]*conn, 0) // Call the Nodes Info API at /_nodes/http - req, err := NewRequest("GET", url+"/_nodes/http") + req, err := NewRequest(ctx, "GET", url+"/_nodes/http") if err != nil { return nodes } @@ -859,7 +865,9 @@ func (c *Client) healthchecker() { c.healthcheckStop <- true return case <-ticker.C: - c.healthcheck(timeout, false) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + c.healthcheck(ctx, timeout, false) } } } @@ -868,7 +876,7 @@ func (c *Client) healthchecker() { // the node state, it marks connections as dead, sets them alive etc. // If healthchecks are disabled this is a no-op. // The timeout specifies how long to wait for a response from Elasticsearch. -func (c *Client) healthcheck(timeout time.Duration, force bool) { +func (c *Client) healthcheck(ctx context.Context, timeout time.Duration, force bool) { c.mu.RLock() if !c.healthcheckEnabled && !force { c.mu.RUnlock() @@ -888,7 +896,7 @@ func (c *Client) healthcheck(timeout time.Duration, force bool) { for _, conn := range conns { params := make(url.Values) params.Set("timeout", fmt.Sprintf("%dms", timeoutInMillis)) - req, err := NewRequest("HEAD", conn.URL()+"/?"+params.Encode()) + req, err := NewRequest(ctx, "HEAD", conn.URL()+"/?"+params.Encode()) if err == nil { if basicAuth { req.SetBasicAuth(basicAuthUsername, basicAuthPassword) @@ -1064,7 +1072,7 @@ func (c *Client) PerformRequestC(ctx context.Context, method, path string, param n++ if !retried { // Force a healtcheck as all connections seem to be dead. - c.healthcheck(timeout, false) + c.healthcheck(ctx, timeout, false) } wait, ok, rerr := c.retrier.Retry(n, nil, nil, err) if rerr != nil { @@ -1082,7 +1090,7 @@ func (c *Client) PerformRequestC(ctx context.Context, method, path string, param return nil, err } - req, err = NewRequest(method, conn.URL()+pathWithParams) + req, err = NewRequest(ctx, method, conn.URL()+pathWithParams) if err != nil { c.errorf("elastic: cannot create request for %s %s: %v", strings.ToUpper(method), conn.URL()+pathWithParams, err) return nil, err @@ -1105,7 +1113,7 @@ func (c *Client) PerformRequestC(ctx context.Context, method, path string, param c.dumpRequest((*http.Request)(req)) // Get response - res, err := c.c.Do(((*http.Request)(req)).WithContext(ctx)) + res, err := c.c.Do((*http.Request)(req)) if err != nil { // Return ctx error if available, so we can compare it if ctx.Err() != nil { diff --git a/ping.go b/ping.go index 44390d19a..0ff70b57f 100644 --- a/ping.go +++ b/ping.go @@ -5,6 +5,7 @@ package elastic import ( + "context" "encoding/json" "net/http" "net/url" @@ -73,6 +74,12 @@ func (s *PingService) Pretty(pretty bool) *PingService { // Do returns the PingResult, the HTTP status code of the Elasticsearch // server, and an error. func (s *PingService) Do() (*PingResult, int, error) { + return s.DoC(context.Background()) +} + +// DoC returns the PingResult, the HTTP status code of the Elasticsearch +// server, and an error. +func (s *PingService) DoC(ctx context.Context) (*PingResult, int, error) { s.client.mu.RLock() basicAuth := s.client.basicAuth basicAuthUsername := s.client.basicAuthUsername @@ -100,7 +107,7 @@ func (s *PingService) Do() (*PingResult, int, error) { } // Notice: This service must NOT use PerformRequest! - req, err := NewRequest(method, url_) + req, err := NewRequest(ctx, method, url_) if err != nil { return nil, 0, err } diff --git a/request.go b/request.go index 1347e1b6f..0c2ba0d32 100644 --- a/request.go +++ b/request.go @@ -7,6 +7,7 @@ package elastic import ( "bytes" "compress/gzip" + "context" "encoding/json" "io" "io/ioutil" @@ -19,11 +20,12 @@ import ( type Request http.Request // NewRequest is a http.Request and adds features such as encoding the body. -func NewRequest(method, url string) (*Request, error) { +func NewRequest(ctx context.Context, method, url string) (*Request, error) { req, err := http.NewRequest(method, url, nil) if err != nil { return nil, err } + req = req.WithContext(ctx) req.Header.Add("User-Agent", "elastic/"+Version+" ("+runtime.GOOS+"-"+runtime.GOARCH+")") req.Header.Add("Accept", "application/json") return (*Request)(req), nil