diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 0c2765b28d7a..abb6e01f27f5 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -80,6 +80,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Lower logging level to debug when attempting to configure beats with unknown fields from autodiscovered events/environments {pull}[37816][37816] - Set timeout of 1 minute for FQDN requests {pull}37756[37756] - 'add_cloud_metadata' processor - improve AWS provider HTTP client overriding to support custom certificate bundle handling {pull}44189[44189] +- The Elasticsearch output now correctly applies exponential backoff when being throttled by 429s ("too many requests") from Elasticsarch. {issue}36926[36926] {pull}45073[45073] *Auditbeat* diff --git a/libbeat/outputs/backoff.go b/libbeat/outputs/backoff.go index 87d94bb66d0d..502d3d67d1af 100644 --- a/libbeat/outputs/backoff.go +++ b/libbeat/outputs/backoff.go @@ -30,24 +30,25 @@ import ( type backoffClient struct { client NetworkClient - done chan struct{} - backoff backoff.Backoff + done chan struct{} + connectBackoff backoff.Backoff + publishBackoff backoff.Backoff } // WithBackoff wraps a NetworkClient, adding exponential backoff support to a network client if connection/publishing failed. func WithBackoff(client NetworkClient, init, max time.Duration) NetworkClient { done := make(chan struct{}) - backoff := backoff.NewEqualJitterBackoff(done, init, max) return &backoffClient{ - client: client, - done: done, - backoff: backoff, + client: client, + done: done, + connectBackoff: backoff.NewEqualJitterBackoff(done, init, max), + publishBackoff: backoff.NewEqualJitterBackoff(done, init, max), } } func (b *backoffClient) Connect(ctx context.Context) error { err := b.client.Connect(ctx) - backoff.WaitOnError(b.backoff, err) + backoff.WaitOnError(b.connectBackoff, err) return err } @@ -62,7 +63,7 @@ func (b *backoffClient) Publish(ctx context.Context, batch publisher.Batch) erro if err != nil { b.client.Close() } - backoff.WaitOnError(b.backoff, err) + backoff.WaitOnError(b.publishBackoff, err) return err } diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index 56f28cdbf30d..b539bedfcedc 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -43,7 +43,9 @@ import ( var ( errPayloadTooLarge = errors.New("the bulk payload is too large for the server. Consider to adjust `http.max_content_length` parameter in Elasticsearch or `bulk_max_size` in the beat. The batch has been dropped") - ErrTooOld = errors.New("Elasticsearch is too old. Please upgrade the instance. If you would like to connect to older instances set output.elasticsearch.allow_older_versions to true.") + ErrTooOld = errors.New("Elasticsearch is too old. Please upgrade the instance. If you would like to connect to older instances set output.elasticsearch.allow_older_versions to true.") //nolint:staticcheck //false positive (Elasticsearch should be capitalized) + + errTooMany = errors.New("Elasticsearch returned error 429 Too Many Requests, throttling connection") //nolint:staticcheck //false positive (Elasticsearch should be capitalized) ) // Client is an elasticsearch client. @@ -267,6 +269,15 @@ func (client *Client) Publish(ctx context.Context, batch publisher.Batch) error } else { batch.ACK() } + return publishResultForStats(stats) +} + +func publishResultForStats(stats bulkResultStats) error { + if stats.tooMany > 0 { + // We're being throttled by Elasticsearch, return an error so we + // retry the connection with exponential backoff + return errTooMany + } return nil } diff --git a/libbeat/outputs/elasticsearch/client_test.go b/libbeat/outputs/elasticsearch/client_test.go index f6322383cb5a..00485bb2e83e 100644 --- a/libbeat/outputs/elasticsearch/client_test.go +++ b/libbeat/outputs/elasticsearch/client_test.go @@ -616,6 +616,26 @@ func TestCollectPipelinePublishFail(t *testing.T) { assert.Equal(t, events, res) } +func TestPublishResultForStats(t *testing.T) { + // publishResultForStats should return errTooMany if it is given + // stats with tooMany > 0, and nil otherwise (all other errors are + // either caused by encoding or connection failures, or are + // immediately retryable). + stats := bulkResultStats{ + acked: 1, + duplicates: 2, + fails: 3, + nonIndexable: 4, + deadLetter: 5, + tooMany: 1, + } + + assert.Equal(t, errTooMany, publishResultForStats(stats), "publishResultForStats should return errTooMany if tooMany > 0") + + stats.tooMany = 0 + assert.Nil(t, publishResultForStats(stats), "publishResultForStats should return nil if tooMany == 0") +} + func BenchmarkCollectPublishFailsNone(b *testing.B) { client, err := NewClient( clientSettings{