Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v2: Added ability to do context-aware PerformRequest #503

Merged
merged 11 commits into from
Jun 16, 2017
8 changes: 7 additions & 1 deletion alias.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package elastic

import (
"context"
"fmt"
"net/url"
"strings"
Expand Down Expand Up @@ -251,6 +252,11 @@ func (s *AliasService) buildURL() (string, url.Values, error) {

// Do executes the command.
func (s *AliasService) Do() (*AliasResult, error) {
return s.DoC(nil)
}

// DoC executes the command, passing the context.
func (s *AliasService) DoC(ctx context.Context) (*AliasResult, error) {
path, params, err := s.buildURL()
if err != nil {
return nil, err
Expand All @@ -269,7 +275,7 @@ func (s *AliasService) Do() (*AliasResult, error) {
body["actions"] = actions

// Get response
res, err := s.client.PerformRequest("POST", path, params, body)
res, err := s.client.PerformRequestC(ctx, "POST", path, params, body)
if err != nil {
return nil, err
}
Expand Down
7 changes: 6 additions & 1 deletion aliases.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package elastic

import (
"context"
"fmt"
"net/url"
"strings"
Expand Down Expand Up @@ -42,6 +43,10 @@ func (s *AliasesService) Indices(indexNames ...string) *AliasesService {
}

func (s *AliasesService) Do() (*AliasesResult, error) {
return s.DoC(nil)
}

func (s *AliasesService) DoC(ctx context.Context) (*AliasesResult, error) {
var err error

// Build url
Expand Down Expand Up @@ -72,7 +77,7 @@ func (s *AliasesService) Do() (*AliasesResult, error) {
}

// Get response
res, err := s.client.PerformRequest("GET", path, params, nil)
res, err := s.client.PerformRequestC(ctx, "GET", path, params, nil)
if err != nil {
return nil, err
}
Expand Down
12 changes: 9 additions & 3 deletions bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package elastic

import (
"bytes"
"context"
"errors"
"fmt"
"net/url"
Expand Down Expand Up @@ -138,10 +139,15 @@ func (s *BulkService) bodyAsString() (string, error) {
return buf.String(), nil
}

// Do sends the batched requests to Elasticsearch. Note that, when successful,
// Do runs DoC() with default context.
func (s *BulkService) Do() (*BulkResponse, error) {
return s.DoC(nil)
}

// DoC sends the batched requests to Elasticsearch. Note that, when successful,
// you can reuse the BulkService for the next batch as the list of bulk
// requests is cleared on success.
func (s *BulkService) Do() (*BulkResponse, error) {
func (s *BulkService) DoC(ctx context.Context) (*BulkResponse, error) {
// No actions?
if s.NumberOfActions() == 0 {
return nil, errors.New("elastic: No bulk actions to commit")
Expand Down Expand Up @@ -188,7 +194,7 @@ func (s *BulkService) Do() (*BulkResponse, error) {
}

// Get response
res, err := s.client.PerformRequest("POST", path, params, body)
res, err := s.client.PerformRequestC(ctx, "POST", path, params, body)
if err != nil {
return nil, err
}
Expand Down
10 changes: 8 additions & 2 deletions clear_scroll.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package elastic

import (
"context"
"fmt"
"log"
"net/url"
Expand Down Expand Up @@ -60,8 +61,13 @@ func (s *ClearScrollService) Validate() error {
return nil
}

// Do executes the operation.
// Do runs DoC() with default context.
func (s *ClearScrollService) Do() (*ClearScrollResponse, error) {
return s.DoC(nil)
}

// DoC executes the operation.
func (s *ClearScrollService) DoC(ctx context.Context) (*ClearScrollResponse, error) {
// Check pre-conditions
if err := s.Validate(); err != nil {
return nil, err
Expand All @@ -77,7 +83,7 @@ func (s *ClearScrollService) Do() (*ClearScrollResponse, error) {
body := strings.Join(s.scrollId, ",")

// Get HTTP response
res, err := s.client.PerformRequest("DELETE", path, params, body)
res, err := s.client.PerformRequestC(ctx, "DELETE", path, params, body)
if err != nil {
return nil, err
}
Expand Down
21 changes: 20 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package elastic

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -1014,6 +1015,20 @@ func (c *Client) mustActiveConn() error {
// This is necessary for services that expect e.g. HTTP status 404 as a
// valid outcome (Exists, IndicesExists, IndicesTypeExists).
func (c *Client) PerformRequest(method, path string, params url.Values, body interface{}, ignoreErrors ...int) (*Response, error) {
return c.PerformRequestC(context.Background(), method, path, params, body, ignoreErrors...)
}

// PerformRequestC does a HTTP request to Elasticsearch.
// It can be cancelled via passed Context.
// It returns a response (which might be nil) and an error on failure.
//
// Optionally, a list of HTTP error codes to ignore can be passed.
// This is necessary for services that expect e.g. HTTP status 404 as a
// valid outcome (Exists, IndicesExists, IndicesTypeExists).
func (c *Client) PerformRequestC(ctx context.Context, method, path string, params url.Values, body interface{}, ignoreErrors ...int) (*Response, error) {
if ctx == nil {
ctx = context.Background()
}
start := time.Now().UTC()

c.mu.RLock()
Expand Down Expand Up @@ -1090,8 +1105,12 @@ func (c *Client) PerformRequest(method, path string, params url.Values, body int
c.dumpRequest((*http.Request)(req))

// Get response
res, err := c.c.Do((*http.Request)(req))
res, err := c.c.Do(((*http.Request)(req)).WithContext(ctx))
if err != nil {
// Return ctx error if available, so we can compare it
if ctx.Err() != nil {
err = ctx.Err()
}
n++
wait, ok, rerr := c.retrier.Retry(n, (*http.Request)(req), res, err)
if rerr != nil {
Expand Down
15 changes: 15 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package elastic

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -773,6 +774,20 @@ func TestPerformRequestRetryOnHttpError(t *testing.T) {
}
}

func TestPerformRequestCancelContext(t *testing.T) {
ctx, cancelFunc := context.WithCancel(context.Background())
cancelFunc()

client, err := NewClient()
if err != nil {
t.Fatal(err)
}
_, err = client.PerformRequestC(ctx, "GET", "/", nil, nil)
if err != context.Canceled {
t.Fatalf("expected to return ctx Cancelled error, got: %v", err.Error())
}
}

func TestPerformRequestNoRetryOnValidButUnsuccessfulHttpStatus(t *testing.T) {
var numFailedReqs int
fail := func(r *http.Request) (*http.Response, error) {
Expand Down
10 changes: 8 additions & 2 deletions cluster_health.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package elastic

import (
"context"
"fmt"
"net/url"
"strings"
Expand Down Expand Up @@ -142,8 +143,13 @@ func (s *ClusterHealthService) Validate() error {
return nil
}

// Do executes the operation.
// Do runs DoC() with default context.
func (s *ClusterHealthService) Do() (*ClusterHealthResponse, error) {
return s.DoC(nil)
}

// DoC executes the operation.
func (s *ClusterHealthService) DoC(ctx context.Context) (*ClusterHealthResponse, error) {
// Check pre-conditions
if err := s.Validate(); err != nil {
return nil, err
Expand All @@ -156,7 +162,7 @@ func (s *ClusterHealthService) Do() (*ClusterHealthResponse, error) {
}

// Get HTTP response
res, err := s.client.PerformRequest("GET", path, params, nil)
res, err := s.client.PerformRequestC(ctx, "GET", path, params, nil)
if err != nil {
return nil, err
}
Expand Down
10 changes: 8 additions & 2 deletions cluster_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package elastic

import (
"context"
"fmt"
"net/url"
"strings"
Expand Down Expand Up @@ -125,8 +126,13 @@ func (s *ClusterStateService) Validate() error {
return nil
}

// Do executes the operation.
// Do runs DoC() with default context.
func (s *ClusterStateService) Do() (*ClusterStateResponse, error) {
return s.DoC(nil)
}

// DoC executes the operation.
func (s *ClusterStateService) DoC(ctx context.Context) (*ClusterStateResponse, error) {
// Check pre-conditions
if err := s.Validate(); err != nil {
return nil, err
Expand All @@ -139,7 +145,7 @@ func (s *ClusterStateService) Do() (*ClusterStateResponse, error) {
}

// Get HTTP response
res, err := s.client.PerformRequest("GET", path, params, nil)
res, err := s.client.PerformRequestC(ctx, "GET", path, params, nil)
if err != nil {
return nil, err
}
Expand Down
10 changes: 8 additions & 2 deletions cluster_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package elastic

import (
"context"
"fmt"
"net/url"
"strings"
Expand Down Expand Up @@ -92,8 +93,13 @@ func (s *ClusterStatsService) Validate() error {
return nil
}

// Do executes the operation.
// Do runs DoC() with default context.
func (s *ClusterStatsService) Do() (*ClusterStatsResponse, error) {
return s.DoC(nil)
}

// DoC executes the operation.
func (s *ClusterStatsService) DoC(ctx context.Context) (*ClusterStatsResponse, error) {
// Check pre-conditions
if err := s.Validate(); err != nil {
return nil, err
Expand All @@ -106,7 +112,7 @@ func (s *ClusterStatsService) Do() (*ClusterStatsResponse, error) {
}

// Get HTTP response
res, err := s.client.PerformRequest("GET", path, params, nil)
res, err := s.client.PerformRequestC(ctx, "GET", path, params, nil)
if err != nil {
return nil, err
}
Expand Down
10 changes: 8 additions & 2 deletions count.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package elastic

import (
"context"
"fmt"
"net/url"
"strings"
Expand Down Expand Up @@ -275,8 +276,13 @@ func (s *CountService) Validate() error {
return nil
}

// Do executes the operation.
// Do runs DoC() with default context.
func (s *CountService) Do() (int64, error) {
return s.DoC(nil)
}

// DoC executes the operation.
func (s *CountService) DoC(ctx context.Context) (int64, error) {
// Check pre-conditions
if err := s.Validate(); err != nil {
return 0, err
Expand All @@ -301,7 +307,7 @@ func (s *CountService) Do() (int64, error) {
}

// Get HTTP response
res, err := s.client.PerformRequest("POST", path, params, body)
res, err := s.client.PerformRequestC(ctx, "POST", path, params, body)
if err != nil {
return 0, err
}
Expand Down
10 changes: 8 additions & 2 deletions create_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package elastic

import (
"context"
"errors"
"net/url"

Expand Down Expand Up @@ -71,8 +72,13 @@ func (b *CreateIndexService) Pretty(pretty bool) *CreateIndexService {
return b
}

// Do executes the operation.
// Do runs DoC() with default context.
func (b *CreateIndexService) Do() (*CreateIndexResult, error) {
return b.DoC(nil)
}

// DoC executes the operation.
func (b *CreateIndexService) DoC(ctx context.Context) (*CreateIndexResult, error) {
if b.index == "" {
return nil, errors.New("missing index name")
}
Expand Down Expand Up @@ -105,7 +111,7 @@ func (b *CreateIndexService) Do() (*CreateIndexResult, error) {
}

// Get response
res, err := b.client.PerformRequest("PUT", path, params, body)
res, err := b.client.PerformRequestC(ctx, "PUT", path, params, body)
if err != nil {
return nil, err
}
Expand Down
12 changes: 9 additions & 3 deletions delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package elastic

import (
"context"
"fmt"
"net/url"

Expand Down Expand Up @@ -66,9 +67,14 @@ func (s *DeleteService) Pretty(pretty bool) *DeleteService {
return s
}

// Do deletes the document. It fails if any of index, type, and identifier
// are missing.
// Do runs DoC() with default context.
func (s *DeleteService) Do() (*DeleteResult, error) {
return s.DoC(nil)
}

// DoC deletes the document. It fails if any of index, type, and identifier
// are missing.
func (s *DeleteService) DoC(ctx context.Context) (*DeleteResult, error) {
if s.index == "" {
return nil, ErrMissingIndex
}
Expand Down Expand Up @@ -105,7 +111,7 @@ func (s *DeleteService) Do() (*DeleteResult, error) {
}

// Get response
res, err := s.client.PerformRequest("DELETE", path, params, nil)
res, err := s.client.PerformRequestC(ctx, "DELETE", path, params, nil)
if err != nil {
return nil, err
}
Expand Down
Loading