Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Lower logging level to debug when attempting to configure beats with unknown fields from autodiscovered events/environments {pull}[37816][37816]
- Set timeout of 1 minute for FQDN requests {pull}37756[37756]
- 'add_cloud_metadata' processor - improve AWS provider HTTP client overriding to support custom certificate bundle handling {pull}44189[44189]
- The Elasticsearch output now correctly applies exponential backoff when being throttled by 429s ("too many requests") from Elasticsarch. {issue}36926[36926] {pull}45073[45073]

*Auditbeat*

Expand Down
17 changes: 9 additions & 8 deletions libbeat/outputs/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,25 @@ import (
type backoffClient struct {
client NetworkClient

done chan struct{}
backoff backoff.Backoff
done chan struct{}
connectBackoff backoff.Backoff
publishBackoff backoff.Backoff
}

// WithBackoff wraps a NetworkClient, adding exponential backoff support to a network client if connection/publishing failed.
func WithBackoff(client NetworkClient, init, max time.Duration) NetworkClient {
done := make(chan struct{})
backoff := backoff.NewEqualJitterBackoff(done, init, max)
return &backoffClient{
client: client,
done: done,
backoff: backoff,
client: client,
done: done,
connectBackoff: backoff.NewEqualJitterBackoff(done, init, max),
publishBackoff: backoff.NewEqualJitterBackoff(done, init, max),
}
}

func (b *backoffClient) Connect(ctx context.Context) error {
err := b.client.Connect(ctx)
backoff.WaitOnError(b.backoff, err)
backoff.WaitOnError(b.connectBackoff, err)
return err
}

Expand All @@ -62,7 +63,7 @@ func (b *backoffClient) Publish(ctx context.Context, batch publisher.Batch) erro
if err != nil {
b.client.Close()
}
backoff.WaitOnError(b.backoff, err)
backoff.WaitOnError(b.publishBackoff, err)
return err
}

Expand Down
13 changes: 12 additions & 1 deletion libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@
var (
errPayloadTooLarge = errors.New("the bulk payload is too large for the server. Consider to adjust `http.max_content_length` parameter in Elasticsearch or `bulk_max_size` in the beat. The batch has been dropped")

ErrTooOld = errors.New("Elasticsearch is too old. Please upgrade the instance. If you would like to connect to older instances set output.elasticsearch.allow_older_versions to true.")
ErrTooOld = errors.New("Elasticsearch is too old. Please upgrade the instance. If you would like to connect to older instances set output.elasticsearch.allow_older_versions to true.") //nolint:staticcheck //false positive (Elasticsearch should be capitalized)

errTooMany = errors.New("Elasticsearch returned error 429 Too Many Requests, throttling connection") //nolint:staticcheck //false positive (Elasticsearch should be capitalized)
)

// Client is an elasticsearch client.
Expand Down Expand Up @@ -267,6 +269,15 @@
} else {
batch.ACK()
}
return publishResultForStats(stats)
}

func publishResultForStats(stats bulkResultStats) error {
if stats.tooMany > 0 {
// We're being throttled by Elasticsearch, return an error so we
// retry the connection with exponential backoff
return errTooMany
}
return nil
}

Expand Down Expand Up @@ -351,7 +362,7 @@
client.log.Error("Elasticsearch output received unencoded publisher.Event")
continue
}
event := data[i].EncodedEvent.(*encodedEvent)

Check failure on line 365 in libbeat/outputs/elasticsearch/client.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

Error return value is not checked (errcheck)
if event.err != nil {
// This means there was an error when encoding the event and it isn't
// ingestable, so report the error and continue.
Expand Down Expand Up @@ -477,7 +488,7 @@
itemMessage []byte,
stats *bulkResultStats,
) bool {
encodedEvent := event.EncodedEvent.(*encodedEvent)

Check failure on line 491 in libbeat/outputs/elasticsearch/client.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

Error return value is not checked (errcheck)
if itemStatus < 300 {
if encodedEvent.deadLetter {
// This was ingested into the dead letter index, not the original target
Expand Down
20 changes: 20 additions & 0 deletions libbeat/outputs/elasticsearch/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,26 @@ func TestCollectPipelinePublishFail(t *testing.T) {
assert.Equal(t, events, res)
}

func TestPublishResultForStats(t *testing.T) {
// publishResultForStats should return errTooMany if it is given
// stats with tooMany > 0, and nil otherwise (all other errors are
// either caused by encoding or connection failures, or are
// immediately retryable).
stats := bulkResultStats{
acked: 1,
duplicates: 2,
fails: 3,
nonIndexable: 4,
deadLetter: 5,
tooMany: 1,
}

assert.Equal(t, errTooMany, publishResultForStats(stats), "publishResultForStats should return errTooMany if tooMany > 0")

stats.tooMany = 0
assert.Nil(t, publishResultForStats(stats), "publishResultForStats should return nil if tooMany == 0")
}

func BenchmarkCollectPublishFailsNone(b *testing.B) {
client, err := NewClient(
clientSettings{
Expand Down
Loading