Skip to content

Commit

Permalink
healthcheck uses parent context if available now
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita Koptelov committed Jun 1, 2017
1 parent 6391f1d commit 5cbeafa
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 15 deletions.
34 changes: 21 additions & 13 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,9 @@ func NewClient(options ...ClientOptionFunc) (*Client, error) {

if c.snifferEnabled {
// Sniff the cluster initially
if err := c.sniff(c.snifferTimeoutStartup); err != nil {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
if err := c.sniff(ctx, c.snifferTimeoutStartup); err != nil {
return nil, err
}
} else {
Expand All @@ -239,7 +241,9 @@ func NewClient(options ...ClientOptionFunc) (*Client, error) {

if c.healthcheckEnabled {
// Perform an initial health check
c.healthcheck(c.healthcheckTimeoutStartup, true)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
c.healthcheck(ctx, c.healthcheckTimeoutStartup, true)
}
// Ensure that we have at least one connection available
if err := c.mustActiveConn(); err != nil {
Expand Down Expand Up @@ -684,7 +688,9 @@ func (c *Client) sniffer() {
c.snifferStop <- true
return
case <-ticker.C:
c.sniff(timeout)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
c.sniff(ctx, timeout)
}
}
}
Expand All @@ -694,7 +700,7 @@ func (c *Client) sniffer() {
// by the preceding sniffing process (if sniffing is enabled).
//
// If sniffing is disabled, this is a no-op.
func (c *Client) sniff(timeout time.Duration) error {
func (c *Client) sniff(ctx context.Context, timeout time.Duration) error {
c.mu.RLock()
if !c.snifferEnabled {
c.mu.RUnlock()
Expand Down Expand Up @@ -731,7 +737,7 @@ func (c *Client) sniff(timeout time.Duration) error {
// Start sniffing on all found URLs
ch := make(chan []*conn, len(urls))
for _, url := range urls {
go func(url string) { ch <- c.sniffNode(url) }(url)
go func(url string) { ch <- c.sniffNode(ctx, url) }(url)
}

// Wait for the results to come back, or the process times out.
Expand All @@ -757,11 +763,11 @@ var reSniffHostAndPort = regexp.MustCompile(`\/([^:]*):([0-9]+)\]`)
// in sniff. If successful, it returns the list of node URLs extracted
// from the result of calling Nodes Info API. Otherwise, an empty array
// is returned.
func (c *Client) sniffNode(url string) []*conn {
func (c *Client) sniffNode(ctx context.Context, url string) []*conn {
nodes := make([]*conn, 0)

// Call the Nodes Info API at /_nodes/http
req, err := NewRequest("GET", url+"/_nodes/http")
req, err := NewRequest(ctx, "GET", url+"/_nodes/http")
if err != nil {
return nodes
}
Expand Down Expand Up @@ -859,7 +865,9 @@ func (c *Client) healthchecker() {
c.healthcheckStop <- true
return
case <-ticker.C:
c.healthcheck(timeout, false)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
c.healthcheck(ctx, timeout, false)
}
}
}
Expand All @@ -868,7 +876,7 @@ func (c *Client) healthchecker() {
// the node state, it marks connections as dead, sets them alive etc.
// If healthchecks are disabled this is a no-op.
// The timeout specifies how long to wait for a response from Elasticsearch.
func (c *Client) healthcheck(timeout time.Duration, force bool) {
func (c *Client) healthcheck(ctx context.Context, timeout time.Duration, force bool) {
c.mu.RLock()
if !c.healthcheckEnabled && !force {
c.mu.RUnlock()
Expand All @@ -888,7 +896,7 @@ func (c *Client) healthcheck(timeout time.Duration, force bool) {
for _, conn := range conns {
params := make(url.Values)
params.Set("timeout", fmt.Sprintf("%dms", timeoutInMillis))
req, err := NewRequest("HEAD", conn.URL()+"/?"+params.Encode())
req, err := NewRequest(ctx, "HEAD", conn.URL()+"/?"+params.Encode())
if err == nil {
if basicAuth {
req.SetBasicAuth(basicAuthUsername, basicAuthPassword)
Expand Down Expand Up @@ -1064,7 +1072,7 @@ func (c *Client) PerformRequestC(ctx context.Context, method, path string, param
n++
if !retried {
// Force a healtcheck as all connections seem to be dead.
c.healthcheck(timeout, false)
c.healthcheck(ctx, timeout, false)
}
wait, ok, rerr := c.retrier.Retry(n, nil, nil, err)
if rerr != nil {
Expand All @@ -1082,7 +1090,7 @@ func (c *Client) PerformRequestC(ctx context.Context, method, path string, param
return nil, err
}

req, err = NewRequest(method, conn.URL()+pathWithParams)
req, err = NewRequest(ctx, method, conn.URL()+pathWithParams)
if err != nil {
c.errorf("elastic: cannot create request for %s %s: %v", strings.ToUpper(method), conn.URL()+pathWithParams, err)
return nil, err
Expand All @@ -1105,7 +1113,7 @@ func (c *Client) PerformRequestC(ctx context.Context, method, path string, param
c.dumpRequest((*http.Request)(req))

// Get response
res, err := c.c.Do(((*http.Request)(req)).WithContext(ctx))
res, err := c.c.Do((*http.Request)(req))
if err != nil {
// Return ctx error if available, so we can compare it
if ctx.Err() != nil {
Expand Down
9 changes: 8 additions & 1 deletion ping.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package elastic

import (
"context"
"encoding/json"
"net/http"
"net/url"
Expand Down Expand Up @@ -73,6 +74,12 @@ func (s *PingService) Pretty(pretty bool) *PingService {
// Do returns the PingResult, the HTTP status code of the Elasticsearch
// server, and an error.
func (s *PingService) Do() (*PingResult, int, error) {
return s.DoC(context.Background())
}

// DoC returns the PingResult, the HTTP status code of the Elasticsearch
// server, and an error.
func (s *PingService) DoC(ctx context.Context) (*PingResult, int, error) {
s.client.mu.RLock()
basicAuth := s.client.basicAuth
basicAuthUsername := s.client.basicAuthUsername
Expand Down Expand Up @@ -100,7 +107,7 @@ func (s *PingService) Do() (*PingResult, int, error) {
}

// Notice: This service must NOT use PerformRequest!
req, err := NewRequest(method, url_)
req, err := NewRequest(ctx, method, url_)
if err != nil {
return nil, 0, err
}
Expand Down
4 changes: 3 additions & 1 deletion request.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package elastic
import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"io"
"io/ioutil"
Expand All @@ -19,11 +20,12 @@ import (
type Request http.Request

// NewRequest is a http.Request and adds features such as encoding the body.
func NewRequest(method, url string) (*Request, error) {
func NewRequest(ctx context.Context, method, url string) (*Request, error) {
req, err := http.NewRequest(method, url, nil)
if err != nil {
return nil, err
}
req = req.WithContext(ctx)
req.Header.Add("User-Agent", "elastic/"+Version+" ("+runtime.GOOS+"-"+runtime.GOARCH+")")
req.Header.Add("Accept", "application/json")
return (*Request)(req), nil
Expand Down

0 comments on commit 5cbeafa

Please sign in to comment.