Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ otherwise no tag is added. {issue}42208[42208] {pull}42403[42403]
- Fix `dns` processor to handle IPv6 server addresses properly. {pull}44526[44526]
- Fix an issue where the Kafka output could get stuck if a proxied connection to the Kafka cluster was reset. {issue}44606[44606]
- Use Debian 11 to build linux/arm to match linux/amd64. Upgrades linux/arm64's statically linked glibc from 2.28 to 2.31. {issue}44816[44816]
- The Elasticsearch output now correctly applies exponential backoff when being throttled by 429s ("too many requests") from Elasticsarch. {issue}36926[36926] {pull}45073[45073]

*Auditbeat*

Expand Down
17 changes: 9 additions & 8 deletions libbeat/outputs/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,25 @@ import (
type backoffClient struct {
client NetworkClient

done chan struct{}
backoff backoff.Backoff
done chan struct{}
connectBackoff backoff.Backoff
publishBackoff backoff.Backoff
}

// WithBackoff wraps a NetworkClient, adding exponential backoff support to a network client if connection/publishing failed.
func WithBackoff(client NetworkClient, init, max time.Duration) NetworkClient {
done := make(chan struct{})
backoff := backoff.NewEqualJitterBackoff(done, init, max)
return &backoffClient{
client: client,
done: done,
backoff: backoff,
client: client,
done: done,
connectBackoff: backoff.NewEqualJitterBackoff(done, init, max),
publishBackoff: backoff.NewEqualJitterBackoff(done, init, max),
}
}

func (b *backoffClient) Connect(ctx context.Context) error {
err := b.client.Connect(ctx)
backoff.WaitOnError(b.backoff, err)
backoff.WaitOnError(b.connectBackoff, err)
return err
}

Expand All @@ -62,7 +63,7 @@ func (b *backoffClient) Publish(ctx context.Context, batch publisher.Batch) erro
if err != nil {
b.client.Close()
}
backoff.WaitOnError(b.backoff, err)
backoff.WaitOnError(b.publishBackoff, err)
return err
}

Expand Down
13 changes: 12 additions & 1 deletion libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ import (
var (
errPayloadTooLarge = errors.New("the bulk payload is too large for the server. Consider to adjust `http.max_content_length` parameter in Elasticsearch or `bulk_max_size` in the beat. The batch has been dropped")

ErrTooOld = errors.New("Elasticsearch is too old. Please upgrade the instance. If you would like to connect to older instances set output.elasticsearch.allow_older_versions to true") //nolint:staticcheck //false positive
ErrTooOld = errors.New("Elasticsearch is too old. Please upgrade the instance. If you would like to connect to older instances set output.elasticsearch.allow_older_versions to true") //nolint:staticcheck //false positive (Elasticsearch should be capitalized)

errTooMany = errors.New("Elasticsearch returned error 429 Too Many Requests, throttling connection") //nolint:staticcheck //false positive (Elasticsearch should be capitalized)
)

// Client is an elasticsearch client.
Expand Down Expand Up @@ -269,6 +271,15 @@ func (client *Client) Publish(ctx context.Context, batch publisher.Batch) error
} else {
batch.ACK()
}
return publishResultForStats(stats)
}

func publishResultForStats(stats bulkResultStats) error {
if stats.tooMany > 0 {
// We're being throttled by Elasticsearch, return an error so we
// retry the connection with exponential backoff
return errTooMany
}
return nil
}

Expand Down
20 changes: 20 additions & 0 deletions libbeat/outputs/elasticsearch/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,26 @@ func TestCollectPipelinePublishFail(t *testing.T) {
assert.Equal(t, events, res)
}

func TestPublishResultForStats(t *testing.T) {
// publishResultForStats should return errTooMany if it is given
// stats with tooMany > 0, and nil otherwise (all other errors are
// either caused by encoding or connection failures, or are
// immediately retryable).
stats := bulkResultStats{
acked: 1,
duplicates: 2,
fails: 3,
nonIndexable: 4,
deadLetter: 5,
tooMany: 1,
}

assert.Equal(t, errTooMany, publishResultForStats(stats), "publishResultForStats should return errTooMany if tooMany > 0")

stats.tooMany = 0
assert.Nil(t, publishResultForStats(stats), "publishResultForStats should return nil if tooMany == 0")
}

func BenchmarkCollectPublishFailsNone(b *testing.B) {
logger, err := logp.NewDevelopmentLogger("")
require.NoError(b, err)
Expand Down