diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index a4c3648af99c..04096b992e19 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -136,6 +136,7 @@ otherwise no tag is added. {issue}42208[42208] {pull}42403[42403] - Fix `dns` processor to handle IPv6 server addresses properly. {pull}44526[44526] - Fix an issue where the Kafka output could get stuck if a proxied connection to the Kafka cluster was reset. {issue}44606[44606] - Use Debian 11 to build linux/arm to match linux/amd64. Upgrades linux/arm64's statically linked glibc from 2.28 to 2.31. {issue}44816[44816] +- The Elasticsearch output now correctly applies exponential backoff when being throttled by 429s ("too many requests") from Elasticsarch. {issue}36926[36926] {pull}45073[45073] *Auditbeat* diff --git a/libbeat/outputs/backoff.go b/libbeat/outputs/backoff.go index 87d94bb66d0d..502d3d67d1af 100644 --- a/libbeat/outputs/backoff.go +++ b/libbeat/outputs/backoff.go @@ -30,24 +30,25 @@ import ( type backoffClient struct { client NetworkClient - done chan struct{} - backoff backoff.Backoff + done chan struct{} + connectBackoff backoff.Backoff + publishBackoff backoff.Backoff } // WithBackoff wraps a NetworkClient, adding exponential backoff support to a network client if connection/publishing failed. func WithBackoff(client NetworkClient, init, max time.Duration) NetworkClient { done := make(chan struct{}) - backoff := backoff.NewEqualJitterBackoff(done, init, max) return &backoffClient{ - client: client, - done: done, - backoff: backoff, + client: client, + done: done, + connectBackoff: backoff.NewEqualJitterBackoff(done, init, max), + publishBackoff: backoff.NewEqualJitterBackoff(done, init, max), } } func (b *backoffClient) Connect(ctx context.Context) error { err := b.client.Connect(ctx) - backoff.WaitOnError(b.backoff, err) + backoff.WaitOnError(b.connectBackoff, err) return err } @@ -62,7 +63,7 @@ func (b *backoffClient) Publish(ctx context.Context, batch publisher.Batch) erro if err != nil { b.client.Close() } - backoff.WaitOnError(b.backoff, err) + backoff.WaitOnError(b.publishBackoff, err) return err } diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index c39a7c663505..599b827048bd 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -43,7 +43,9 @@ import ( var ( errPayloadTooLarge = errors.New("the bulk payload is too large for the server. Consider to adjust `http.max_content_length` parameter in Elasticsearch or `bulk_max_size` in the beat. The batch has been dropped") - ErrTooOld = errors.New("Elasticsearch is too old. Please upgrade the instance. If you would like to connect to older instances set output.elasticsearch.allow_older_versions to true") //nolint:staticcheck //false positive + ErrTooOld = errors.New("Elasticsearch is too old. Please upgrade the instance. If you would like to connect to older instances set output.elasticsearch.allow_older_versions to true") //nolint:staticcheck //false positive (Elasticsearch should be capitalized) + + errTooMany = errors.New("Elasticsearch returned error 429 Too Many Requests, throttling connection") //nolint:staticcheck //false positive (Elasticsearch should be capitalized) ) // Client is an elasticsearch client. @@ -269,6 +271,15 @@ func (client *Client) Publish(ctx context.Context, batch publisher.Batch) error } else { batch.ACK() } + return publishResultForStats(stats) +} + +func publishResultForStats(stats bulkResultStats) error { + if stats.tooMany > 0 { + // We're being throttled by Elasticsearch, return an error so we + // retry the connection with exponential backoff + return errTooMany + } return nil } diff --git a/libbeat/outputs/elasticsearch/client_test.go b/libbeat/outputs/elasticsearch/client_test.go index 3c2f69f4a25b..2c4c598fdf68 100644 --- a/libbeat/outputs/elasticsearch/client_test.go +++ b/libbeat/outputs/elasticsearch/client_test.go @@ -636,6 +636,26 @@ func TestCollectPipelinePublishFail(t *testing.T) { assert.Equal(t, events, res) } +func TestPublishResultForStats(t *testing.T) { + // publishResultForStats should return errTooMany if it is given + // stats with tooMany > 0, and nil otherwise (all other errors are + // either caused by encoding or connection failures, or are + // immediately retryable). + stats := bulkResultStats{ + acked: 1, + duplicates: 2, + fails: 3, + nonIndexable: 4, + deadLetter: 5, + tooMany: 1, + } + + assert.Equal(t, errTooMany, publishResultForStats(stats), "publishResultForStats should return errTooMany if tooMany > 0") + + stats.tooMany = 0 + assert.Nil(t, publishResultForStats(stats), "publishResultForStats should return nil if tooMany == 0") +} + func BenchmarkCollectPublishFailsNone(b *testing.B) { logger, err := logp.NewDevelopmentLogger("") require.NoError(b, err)