Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
611 changes: 174 additions & 437 deletions NOTICE.txt

Large diffs are not rendered by default.

161 changes: 80 additions & 81 deletions go.mod

Large diffs are not rendered by default.

355 changes: 174 additions & 181 deletions go.sum

Large diffs are not rendered by default.

26 changes: 20 additions & 6 deletions libbeat/otelbeat/beatconverter/beatconverter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,17 @@ exporters:
max_retries: 3
user: elastic
timeout: 1m30s
batcher:
max_conns_per_host: 1
sending_queue:
batch:
max_size: 1600
min_size: 0
sizer: items
block_on_overflow: true
enabled: true
max_size: 1600
min_size: 0
num_consumers: 1
queue_size: 3200
wait_for_result: true
mapping:
mode: bodymap
compression: gzip
Expand Down Expand Up @@ -187,10 +194,17 @@ exporters:
max_retries: 3
user: elastic-cloud
timeout: 1m30s
batcher:
max_conns_per_host: 1
sending_queue:
batch:
max_size: 1600
min_size: 0
sizer: items
block_on_overflow: true
enabled: true
max_size: 1600
min_size: 0
num_consumers: 1
queue_size: 3200
wait_for_result: true
mapping:
mode: bodymap
compression: gzip
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,12 @@ import (
"reflect"
"strings"

"github.com/go-viper/mapstructure/v2"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/transport/kerberos"
oteltranslate "github.com/elastic/beats/v7/libbeat/otelbeat/oteltranslate"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/beats/v7/libbeat/outputs/elasticsearch"
"github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
)
Expand Down Expand Up @@ -62,6 +60,9 @@ var defaultOptions = esToOTelOptions{
Pipeline: "",
ProxyURL: "",
Preset: "custom", // default is custom if not set
HostWorkerCfg: outputs.HostWorkerCfg{
Workers: 1,
},
}

// ToOTelConfig converts a Beat config into OTel elasticsearch exporter config
Expand Down Expand Up @@ -124,20 +125,32 @@ func ToOTelConfig(output *config.C, logger *logp.Logger) (map[string]any, error)
"timeout": escfg.Transport.Timeout, // timeout
"idle_conn_timeout": escfg.Transport.IdleConnTimeout, // idle_connection_timeout

// max_conns_per_host is a "hard" limit on number of open connections.
// Ideally, escfg.NumWorkers() should map to num_consumer, but we had a bug in upstream
// where it could spin as many goroutines as it liked.
// Given that batcher implementation can change and it has a history of such changes,
// let's keep max_conns_per_host setting for now and remove it once exporterhelper is stable.
"max_conns_per_host": escfg.NumWorkers(),

// Retry
"retry": map[string]any{
"enabled": true,
"initial_interval": escfg.Backoff.Init, // backoff.init
"max_interval": escfg.Backoff.Max, // backoff.max
"max_retries": escfg.MaxRetries, // max_retries

},

// Batcher is experimental
"batcher": map[string]any{
"enabled": true,
"max_size": escfg.BulkMaxSize, // bulk_max_size
"min_size": 0, // 0 means immediately trigger a flush
"sending_queue": map[string]any{
"batch": map[string]any{
"max_size": escfg.BulkMaxSize, // bulk_max_size
"min_size": 0, // 0 means immediately trigger a flush
"sizer": "items",
},
"enabled": true,
"queue_size": getQueueSize(logger, output),
"block_on_overflow": true,
"wait_for_result": true,
"num_consumers": escfg.NumWorkers(),
Comment thread
VihasMakwana marked this conversation as resolved.
},

"mapping": map[string]any{
Expand Down Expand Up @@ -166,10 +179,6 @@ func ToOTelConfig(output *config.C, logger *logp.Logger) (map[string]any, error)
// Dynamic routing is disabled if output.elasticsearch.index is set
setIfNotNil(otelYAMLCfg, "logs_index", escfg.Index) // index

if err := typeSafetyCheck(otelYAMLCfg); err != nil {
return nil, err
}

return otelYAMLCfg, nil
}

Expand Down Expand Up @@ -200,24 +209,6 @@ func checkUnsupportedConfig(cfg *config.C, logger *logp.Logger) error {
return nil
}

// For type safety check
func typeSafetyCheck(value map[string]any) error {
// the value should match `elasticsearchexporter.Config` type.
// it throws an error if non existing key names are set
var result elasticsearchexporter.Config
d, _ := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
Squash: true,
Result: &result,
ErrorUnused: true,
})

err := d.Decode(value)
if err != nil {
return err
}
return err
}

Comment thread
VihasMakwana marked this conversation as resolved.
// Helper function to check if a struct is empty
func isStructEmpty(s any) bool {
return reflect.DeepEqual(s, reflect.Zero(reflect.TypeOf(s)).Interface())
Expand Down Expand Up @@ -248,3 +239,12 @@ func setIfNotNil(m map[string]any, key string, value any) {
m[key] = value
}
}

func getQueueSize(logger *logp.Logger, output *config.C) int {
size, err := output.Int("queue.mem.events", -1)
if err != nil {
logger.Debugf("Failed to get queue size: %v", err)
Comment thread
VihasMakwana marked this conversation as resolved.
return memqueue.DefaultEvents // return default queue.mem.events for sending_queue in case of an errr
Comment thread
cmacknz marked this conversation as resolved.
}
return int(size)
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ endpoints:
- http://localhost:9300/foo/bar
idle_conn_timeout: 3s
logs_index: some-index
max_conns_per_host: 30
password: changeme
pipeline: some-ingest-pipeline
proxy_url: https://proxy.url
Expand All @@ -71,15 +72,21 @@ retry:
initial_interval: 42s
max_interval: 7m0s
max_retries: 3
sending_queue:
batch:
max_size: 1600
min_size: 0
sizer: items
block_on_overflow: true
enabled: true
num_consumers: 30
queue_size: 3200
wait_for_result: true
timeout: 1m30s
user: elastic
headers:
X-Header-1: foo
X-Bar-Header: bar
batcher:
enabled: true
max_size: 1600
min_size: 0
mapping:
mode: bodymap
compression: gzip
Expand Down Expand Up @@ -113,12 +120,19 @@ retry:
max_interval: 1m0s
max_retries: 3
timeout: 1m30s
batcher:
sending_queue:
batch:
max_size: 1600
min_size: 0
sizer: items
block_on_overflow: true
enabled: true
max_size: 1600
min_size: 0
num_consumers: 1
queue_size: 3200
wait_for_result: true
mapping:
mode: bodymap
max_conns_per_host: 1
api_key: VGlOQUdHNEJhYU1kYUgxdFJmdVU6S25SNnlFNDFSclNvd2Iwa1EwSFdvQQ==
compression: gzip
compression_params:
Expand Down Expand Up @@ -171,20 +185,34 @@ compression_params:
presetName: "balanced",
output: commonOTelCfg + `
idle_conn_timeout: 3s
batcher:
max_conns_per_host: 1
sending_queue:
batch:
max_size: 1600
min_size: 0
sizer: items
block_on_overflow: true
enabled: true
max_size: 1600
min_size: 0
num_consumers: 1
queue_size: 3200
wait_for_result: true
`,
},
{
presetName: "throughput",
output: commonOTelCfg + `
idle_conn_timeout: 15s
batcher:
max_conns_per_host: 4
sending_queue:
batch:
max_size: 1600
min_size: 0
sizer: items
block_on_overflow: true
enabled: true
max_size: 1600
min_size: 0
num_consumers: 4
queue_size: 12800
wait_for_result: true
`,
},
{
Expand All @@ -202,10 +230,17 @@ password: changeme
user: elastic
timeout: 1m30s
idle_conn_timeout: 1s
batcher:
max_conns_per_host: 1
sending_queue:
batch:
max_size: 1600
min_size: 0
sizer: items
block_on_overflow: true
enabled: true
max_size: 1600
min_size: 0
num_consumers: 1
queue_size: 3200
wait_for_result: true
mapping:
mode: bodymap
compression: gzip
Expand All @@ -217,20 +252,34 @@ compression_params:
presetName: "latency",
output: commonOTelCfg + `
idle_conn_timeout: 1m0s
batcher:
max_conns_per_host: 1
sending_queue:
batch:
max_size: 50
min_size: 0
sizer: items
block_on_overflow: true
enabled: true
max_size: 50
min_size: 0
num_consumers: 1
queue_size: 4100
wait_for_result: true
`,
},
{
presetName: "custom",
output: commonOTelCfg + `
idle_conn_timeout: 3s
batcher:
max_conns_per_host: 1
sending_queue:
batch:
max_size: 1600
min_size: 0
sizer: items
block_on_overflow: true
enabled: true
max_size: 1600
min_size: 0
num_consumers: 1
queue_size: 3200
wait_for_result: true
`,
},
}
Expand Down Expand Up @@ -274,11 +323,18 @@ retry:
max_interval: 1m0s
max_retries: 3
timeout: 1m30s
max_conns_per_host: 1
user: elastic
batcher:
sending_queue:
batch:
max_size: 1600
min_size: 0
sizer: items
block_on_overflow: true
enabled: true
max_size: 1600
min_size: 0
num_consumers: 1
queue_size: 3200
wait_for_result: true
mapping:
mode: bodymap
{{ if gt . 0 }}
Expand Down
4 changes: 3 additions & 1 deletion libbeat/publisher/queue/memqueue/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
c "github.com/elastic/elastic-agent-libs/config"
)

const DefaultEvents = 3200

type config struct {
Events int `config:"events" validate:"min=32"`
// This field is named MaxGetRequest because its logical effect is to give
Expand All @@ -36,7 +38,7 @@
}

var defaultConfig = config{
Events: 3200,
Events: DefaultEvents,
MaxGetRequest: 1600,
FlushTimeout: 10 * time.Second,
}
Expand All @@ -58,7 +60,7 @@
}
}
//nolint:gosimple // Actually want this conversion to be explicit since the types aren't definitionally equal.
return Settings{

Check failure on line 63 in libbeat/publisher/queue/memqueue/config.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

S1016: should convert config (type config) to Settings instead of using struct literal (staticcheck)
Events: config.Events,
MaxGetRequest: config.MaxGetRequest,
FlushTimeout: config.FlushTimeout,
Expand Down
15 changes: 11 additions & 4 deletions x-pack/filebeat/tests/integration/otel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,10 +637,6 @@ processors:
`
expectedExporter := `exporters:
elasticsearch:
batcher:
enabled: true
max_size: 1600
min_size: 0
compression: gzip
compression_params:
level: 1
Expand All @@ -650,12 +646,23 @@ processors:
logs_index: index
mapping:
mode: bodymap
max_conns_per_host: 1
password: testing
retry:
enabled: true
initial_interval: 1s
max_interval: 1m0s
max_retries: 3
sending_queue:
batch:
max_size: 1600
min_size: 0
sizer: items
block_on_overflow: true
enabled: true
num_consumers: 1
queue_size: 3200
wait_for_result: true
timeout: 1m30s
user: admin`
expectedReceiver := `receivers:
Expand Down
Loading
Loading