diff --git a/.travis.yml b/.travis.yml index 7824d9b39..8f37e2997 100644 --- a/.travis.yml +++ b/.travis.yml @@ -11,7 +11,7 @@ env: global: - GO15VENDOREXPERIMENT=1 matrix: - - ES_VERSION=2.4.4 + - ES_VERSION=2.4.6 allow_failures: - go: tip before_script: diff --git a/CONTRIBUTORS b/CONTRIBUTORS index 459c2382b..6bce05cf7 100644 --- a/CONTRIBUTORS +++ b/CONTRIBUTORS @@ -10,18 +10,21 @@ Adam Alix [@adamalix](https://github.com/adamalix) Adam Weiner [@adamweiner](https://github.com/adamweiner) Adrian Lungu [@AdrianLungu](https://github.com/AdrianLungu) +alehano [@alehano](https://github.com/alehano) Alex [@akotlar](https://github.com/akotlar) Alexandre Olivier [@aliphen](https://github.com/aliphen) Alexey Sharov [@nizsheanez](https://github.com/nizsheanez) AndreKR [@AndreKR](https://github.com/AndreKR) Andrew Dunham [@andrew-d](https://github.com/andrew-d) Andrew Gaul [@andrewgaul](https://github.com/andrewgaul) +Arquivei [@arquivei](https://github.com/arquivei) Benjamin Fernandes [@LotharSee](https://github.com/LotharSee) Benjamin Zarzycki [@kf6nux](https://github.com/kf6nux) Braden Bassingthwaite [@bbassingthwaite-va](https://github.com/bbassingthwaite-va) Brady Love [@bradylove](https://github.com/bradylove) Bryan Conklin [@bmconklin](https://github.com/bmconklin) Bruce Zhou [@brucez-isell](https://github.com/brucez-isell) +cforbes [@cforbes](https://github.com/cforbes) Chris M [@tebriel](https://github.com/tebriel) Christophe Courtaut [@kri5](https://github.com/kri5) Conrad Pankoff [@deoxxa](https://github.com/deoxxa) @@ -31,8 +34,11 @@ Daniel Heckrath [@DanielHeckrath](https://github.com/DanielHeckrath) Daniel Imfeld [@dimfeld](https://github.com/dimfeld) Dwayne Schultz [@myshkin5](https://github.com/myshkin5) Ellison Leão [@ellisonleao](https://github.com/ellisonleao) +Erwin [@eticzon](https://github.com/eticzon) Eugene Egorov [@EugeneEgorov](https://github.com/EugeneEgorov) +Fanfan [@wenpos](https://github.com/wenpos) Faolan C-P [@fcheslack](https://github.com/fcheslack) +Filip Tepper [@filiptepper](https://github.com/filiptepper) Gerhard Häring [@ghaering](https://github.com/ghaering) Guilherme Silveira [@guilherme-santos](https://github.com/guilherme-santos) Guillaume J. Charmes [@creack](https://github.com/creack) @@ -54,6 +60,7 @@ John Stanford [@jxstanford](https://github.com/jxstanford) jun [@coseyo](https://github.com/coseyo) Junpei Tsuji [@jun06t](https://github.com/jun06t) Kenta SUZUKI [@suzuken](https://github.com/suzuken) +Kevin Mulvey [@kmulvey](https://github.com/kmulvey) Kyle Brandt [@kylebrandt](https://github.com/kylebrandt) Leandro Piccilli [@lpic10](https://github.com/lpic10) Maciej Lisiewski [@c2h5oh](https://github.com/c2h5oh) @@ -71,6 +78,7 @@ Nicholas Wolff [@nwolff](https://github.com/nwolff) Nick K [@utrack](https://github.com/utrack) Nick Whyte [@nickw444](https://github.com/nickw444) Orne Brocaar [@brocaar](https://github.com/brocaar) +Pete C [@peteclark-ft](https://github.com/peteclark-ft) Radoslaw Wesolowski [r--w](https://github.com/r--w) Ryan Schmukler [@rschmukler](https://github.com/rschmukler) Sacheendra talluri [@sacheendra](https://github.com/sacheendra) @@ -85,6 +93,7 @@ Tetsuya Morimoto [@t2y](https://github.com/t2y) TimeEmit [@TimeEmit](https://github.com/timeemit) TusharM [@tusharm](https://github.com/tusharm) wangtuo [@wangtuo](https://github.com/wangtuo) +Wédney Yuri [@wedneyyuri](https://github.com/wedneyyuri) wolfkdy [@wolfkdy](https://github.com/wolfkdy) Wyndham Blanton [@wyndhblb](https://github.com/wyndhblb) Yarden Bar [@ayashjorden](https://github.com/ayashjorden) diff --git a/client.go b/client.go index 63844406f..3879baec1 100644 --- a/client.go +++ b/client.go @@ -799,8 +799,12 @@ func (c *Client) sniff(timeout time.Duration) error { // Start sniffing on all found URLs ch := make(chan []*conn, len(urls)) + + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + for _, url := range urls { - go func(url string) { ch <- c.sniffNode(url) }(url) + go func(url string) { ch <- c.sniffNode(ctx, url) }(url) } // Wait for the results to come back, or the process times out. @@ -811,7 +815,7 @@ func (c *Client) sniff(timeout time.Duration) error { c.updateConns(conns) return nil } - case <-time.After(timeout): + case <-ctx.Done(): // We get here if no cluster responds in time return ErrNoClient } @@ -822,7 +826,7 @@ func (c *Client) sniff(timeout time.Duration) error { // in sniff. If successful, it returns the list of node URLs extracted // from the result of calling Nodes Info API. Otherwise, an empty array // is returned. -func (c *Client) sniffNode(url string) []*conn { +func (c *Client) sniffNode(ctx context.Context, url string) []*conn { var nodes []*conn // Call the Nodes Info API at /_nodes/http @@ -840,7 +844,7 @@ func (c *Client) sniffNode(url string) []*conn { } c.mu.RUnlock() - res, err := c.c.Do((*http.Request)(req)) + res, err := c.c.Do((*http.Request)(req).WithContext(ctx)) if err != nil { return nodes } @@ -974,37 +978,55 @@ func (c *Client) healthcheck(timeout time.Duration, force bool) { conns := c.conns c.connsMu.RUnlock() - timeoutInMillis := int64(timeout / time.Millisecond) - for _, conn := range conns { - params := make(url.Values) - params.Set("timeout", fmt.Sprintf("%dms", timeoutInMillis)) - req, err := NewRequest("HEAD", conn.URL()+"/?"+params.Encode()) - if err == nil { + // Run the HEAD request against ES with a timeout + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + // Goroutine executes the HTTP request, returns an error and sets status + var status int + errc := make(chan error, 1) + go func(url string) { + req, err := NewRequest("HEAD", url) + if err != nil { + errc <- err + return + } if basicAuth { req.SetBasicAuth(basicAuthUsername, basicAuthPassword) } if c.prepareRequest != nil { c.prepareRequest((*http.Request)(req)) } - res, err := c.c.Do((*http.Request)(req)) + res, err := c.c.Do((*http.Request)(req).WithContext(ctx)) if err == nil { + status = res.StatusCode if res.Body != nil { - defer res.Body.Close() + res.Body.Close() } - if res.StatusCode >= 200 && res.StatusCode < 300 { - conn.MarkAsAlive() - } else { - conn.MarkAsDead() - c.errorf("elastic: %s is dead [status=%d]", conn.URL(), res.StatusCode) - } - } else { - c.errorf("elastic: %s is dead", conn.URL()) - conn.MarkAsDead() } - } else { + errc <- err + }(conn.URL()) + + // Wait for the Goroutine (or its timeout) + select { + case <-ctx.Done(): // timeout c.errorf("elastic: %s is dead", conn.URL()) conn.MarkAsDead() + break + case err := <-errc: + if err != nil { + c.errorf("elastic: %s is dead", conn.URL()) + conn.MarkAsDead() + break + } + if status >= 200 && status < 300 { + conn.MarkAsAlive() + } else { + conn.MarkAsDead() + c.errorf("elastic: %s is dead [status=%d]", conn.URL(), status) + } + break } } } diff --git a/client_test.go b/client_test.go index a33e46137..1f2ab6f79 100644 --- a/client_test.go +++ b/client_test.go @@ -10,12 +10,15 @@ import ( "errors" "fmt" "log" + "net" "net/http" + "reflect" "regexp" "strings" "testing" "time" + "github.com/fortytw2/leaktest" "golang.org/x/net/context" ) @@ -264,6 +267,74 @@ func TestClientHealthcheckStartupTimeout(t *testing.T) { } } +func TestClientHealthcheckTimeoutLeak(t *testing.T) { + // This test test checks if healthcheck requests are canceled + // after timeout. + // It contains couple of hacks which won't be needed once we + // stop supporting Go1.7. + // On Go1.7 it uses server side effects to monitor if connection + // was closed, + // and on Go 1.8+ we're additionally honestly monitoring routine + // leaks via leaktest. + mux := http.NewServeMux() + + var reqDone bool + mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + cn, ok := w.(http.CloseNotifier) + if !ok { + t.Fatalf("Writer is not CloseNotifier, but %v", reflect.TypeOf(w).Name()) + } + <-cn.CloseNotify() + reqDone = true + }) + + lis, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("Couldn't setup listener: %v", err) + } + addr := lis.Addr().String() + + srv := &http.Server{ + Handler: mux, + } + go srv.Serve(lis) + + cli := &Client{ + c: &http.Client{}, + conns: []*conn{ + &conn{ + url: "http://" + addr + "/", + }, + }, + } + + type closer interface { + Shutdown(context.Context) error + } + + // pre-Go1.8 Server can't Shutdown + cl, isServerCloseable := (interface{}(srv)).(closer) + + // Since Go1.7 can't Shutdown() - there will be leak from server + // Monitor leaks on Go 1.8+ + if isServerCloseable { + defer leaktest.CheckTimeout(t, time.Second*10)() + } + + cli.healthcheck(time.Millisecond*500, true) + + if isServerCloseable { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + cl.Shutdown(ctx) + } + + <-time.After(time.Second) + if !reqDone { + t.Fatal("Request wasn't canceled or stopped") + } +} + // -- NewSimpleClient -- func TestSimpleClientDefaults(t *testing.T) { @@ -400,7 +471,7 @@ func TestClientSniffNode(t *testing.T) { } ch := make(chan []*conn) - go func() { ch <- client.sniffNode(DefaultURL) }() + go func() { ch <- client.sniffNode(context.Background(), DefaultURL) }() select { case nodes := <-ch: @@ -454,6 +525,75 @@ func TestClientSniffOnDefaultURL(t *testing.T) { } } +func TestClientSniffTimeoutLeak(t *testing.T) { + // This test test checks if sniff requests are canceled + // after timeout. + // It contains couple of hacks which won't be needed once we + // stop supporting Go1.7. + // On Go1.7 it uses server side effects to monitor if connection + // was closed, + // and on Go 1.8+ we're additionally honestly monitoring routine + // leaks via leaktest. + mux := http.NewServeMux() + + var reqDone bool + mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + cn, ok := w.(http.CloseNotifier) + if !ok { + t.Fatalf("Writer is not CloseNotifier, but %v", reflect.TypeOf(w).Name()) + } + <-cn.CloseNotify() + reqDone = true + }) + + lis, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("Couldn't setup listener: %v", err) + } + addr := lis.Addr().String() + + srv := &http.Server{ + Handler: mux, + } + go srv.Serve(lis) + + cli := &Client{ + c: &http.Client{}, + conns: []*conn{ + &conn{ + url: "http://" + addr + "/", + }, + }, + snifferEnabled: true, + } + + type closer interface { + Shutdown(context.Context) error + } + + // pre-Go1.8 Server can't Shutdown + cl, isServerCloseable := (interface{}(srv)).(closer) + + // Since Go1.7 can't Shutdown() - there will be leak from server + // Monitor leaks on Go 1.8+ + if isServerCloseable { + defer leaktest.CheckTimeout(t, time.Second*10)() + } + + cli.sniff(time.Millisecond * 500) + + if isServerCloseable { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + cl.Shutdown(ctx) + } + + <-time.After(time.Second) + if !reqDone { + t.Fatal("Request wasn't canceled or stopped") + } +} + func TestClientExtractHostname(t *testing.T) { tests := []struct { Scheme string