Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug: Context timeout error is swallowed when closing BulkIndexer in v7.17 #668

Open
karatekaneen opened this issue May 16, 2023 · 1 comment

Comments

@karatekaneen
Copy link

When I was trying to gracefully handle timeouts I noticed that the error is swallowed and it looks like everything was OK even though the timeout was triggered and the whole operation was aborted.

To reproduce

package elastic

import (
	"context"
	"errors"
	"log"
	"strings"
	"testing"
	"time"

	"github.com/elastic/go-elasticsearch/v7"
	"github.com/elastic/go-elasticsearch/v7/esutil"
)

func TestNewBulkIndexer(t *testing.T) {
	// I don't have any elasticsearch running so this will definitely time out
	es, err := elasticsearch.NewClient(elasticsearch.Config{
		RetryBackoff: func(i int) time.Duration { return time.Duration(i) * 1000 * time.Millisecond },
		MaxRetries:   5,
	})
	if err != nil {
		log.Fatalf("Error creating the client: %s", err)
	}

	indexer, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
		Client:     es,     // The Elasticsearch client
		Index:      "test", // The default index name
		NumWorkers: 4,      // The number of worker goroutines (default: number of CPUs)
		FlushBytes: 5e+6,   // The flush threshold in bytes (default: 5M)
	})
	if err != nil {
		log.Fatalf("Error creating the indexer: %s", err)
	}
	err = indexer.Add(
		context.Background(),
		esutil.BulkIndexerItem{
			Action:     "index",
			DocumentID: "1",
			Body:       strings.NewReader(`{"title":"Test"}`),
		},
	)
	if err != nil {
		log.Fatalf("Unexpected error: %s", err)
	}

	// Add a very narrow timeout
	ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
	defer cancel()

	// Close the bulkIndexer
	err = indexer.Close(ctx)
	// This should be "context deadline cancelled" but is nil for some reason
	if !errors.Is(err, context.DeadlineExceeded) {
		t.Errorf(`Expected "context deadline cancelled" - Got %v`, err)
	}
}
@floretan
Copy link

We ran into the same issue, which was pretty tricky and confusing to debug. We only came to the conclusion by seeing timeouts from the pubsub subscription that feeds the bulk indexer while there were no errors in the logs.

In our case we have a continuous indexing job running, so under normal circumstances we almost never reach the "close" step. It would be good to have the error available in the OnFailure handler for an individual item or the OnError handler for the bulk indexer itself.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants