Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
611 changes: 174 additions & 437 deletions NOTICE.txt

Large diffs are not rendered by default.

161 changes: 80 additions & 81 deletions go.mod

Large diffs are not rendered by default.

355 changes: 174 additions & 181 deletions go.sum

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,12 @@ import (
"reflect"
"strings"

"github.com/go-viper/mapstructure/v2"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/transport/kerberos"
oteltranslate "github.com/elastic/beats/v7/libbeat/otelbeat/oteltranslate"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/beats/v7/libbeat/outputs/elasticsearch"
"github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
)
Expand Down Expand Up @@ -62,6 +60,9 @@ var defaultOptions = esToOTelOptions{
Pipeline: "",
ProxyURL: "",
Preset: "custom", // default is custom if not set
HostWorkerCfg: outputs.HostWorkerCfg{
Workers: 1,
},
}

// ToOTelConfig converts a Beat config into OTel elasticsearch exporter config
Expand Down Expand Up @@ -124,20 +125,32 @@ func ToOTelConfig(output *config.C, logger *logp.Logger) (map[string]any, error)
"timeout": escfg.Transport.Timeout, // timeout
"idle_conn_timeout": escfg.Transport.IdleConnTimeout, // idle_connection_timeout

// For libbeat ES output, the "workers" setting controls the number of concurrent connections per ES host.
// For elasticsearchexporter, we can achieve the same concurrency by specifying "max_conns_per_host" setting
// as our otelconsumer sends data parallelly to the consumer.
// Also, we use http/1 in libbeat. To achieve parity, disable force_attempt_http2
"max_conns_per_host": escfg.NumWorkers(),
Comment thread
VihasMakwana marked this conversation as resolved.
Outdated
"force_attempt_http2": false,

// Retry
"retry": map[string]any{
"enabled": true,
"initial_interval": escfg.Backoff.Init, // backoff.init
"max_interval": escfg.Backoff.Max, // backoff.max
"max_retries": escfg.MaxRetries, // max_retries

},

// Batcher is experimental
"batcher": map[string]any{
"enabled": true,
"max_size": escfg.BulkMaxSize, // bulk_max_size
"min_size": 0, // 0 means immediately trigger a flush
"sending_queue": map[string]any{
"batch": map[string]any{
"max_size": escfg.BulkMaxSize, // bulk_max_size
"min_size": 0, // 0 means immediately trigger a flush
"sizer": "items",
},
"enabled": true,
"queue_size": getQueueSize(logger, output),
"block_on_overflow": true,
"wait_for_result": true,
"num_consumers": escfg.NumWorkers(),
Comment thread
VihasMakwana marked this conversation as resolved.
Comment thread
VihasMakwana marked this conversation as resolved.
},

"mapping": map[string]any{
Expand All @@ -162,10 +175,6 @@ func ToOTelConfig(output *config.C, logger *logp.Logger) (map[string]any, error)
// Dynamic routing is disabled if output.elasticsearch.index is set
setIfNotNil(otelYAMLCfg, "logs_index", escfg.Index) // index

if err := typeSafetyCheck(otelYAMLCfg); err != nil {
return nil, err
}

return otelYAMLCfg, nil
}

Expand Down Expand Up @@ -196,24 +205,6 @@ func checkUnsupportedConfig(cfg *config.C, logger *logp.Logger) error {
return nil
}

// For type safety check
func typeSafetyCheck(value map[string]any) error {
// the value should match `elasticsearchexporter.Config` type.
// it throws an error if non existing key names are set
var result elasticsearchexporter.Config
d, _ := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
Squash: true,
Result: &result,
ErrorUnused: true,
})

err := d.Decode(value)
if err != nil {
return err
}
return err
}

Comment thread
VihasMakwana marked this conversation as resolved.
// Helper function to check if a struct is empty
func isStructEmpty(s any) bool {
return reflect.DeepEqual(s, reflect.Zero(reflect.TypeOf(s)).Interface())
Expand Down Expand Up @@ -244,3 +235,12 @@ func setIfNotNil(m map[string]any, key string, value any) {
m[key] = value
}
}

func getQueueSize(logger *logp.Logger, output *config.C) int {
size, err := output.Int("queue.mem.events", -1)
if err != nil {
logger.Debugf("Failed to get queue size: %v", err)
Comment thread
VihasMakwana marked this conversation as resolved.
return memqueue.DefaultEvents // return default queue.mem.events for sending_queue in case of an errr
Comment thread
cmacknz marked this conversation as resolved.
}
return int(size)
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,10 @@ headers:
endpoints:
- http://localhost:9200/foo/bar
- http://localhost:9300/foo/bar
force_attempt_http2: false
idle_conn_timeout: 3s
logs_index: some-index
max_conns_per_host: 30
password: changeme
pipeline: some-ingest-pipeline
proxy_url: https://proxy.url
Expand All @@ -69,15 +71,21 @@ retry:
initial_interval: 42s
max_interval: 7m0s
max_retries: 3
sending_queue:
batch:
max_size: 1600
min_size: 0
sizer: items
block_on_overflow: true
enabled: true
num_consumers: 30
queue_size: 3200
wait_for_result: true
timeout: 1m30s
user: elastic
headers:
X-Header-1: foo
X-Bar-Header: bar
batcher:
enabled: true
max_size: 1600
min_size: 0
mapping:
mode: bodymap
compression: gzip
Expand All @@ -103,6 +111,7 @@ api_key: "TiNAGG4BaaMdaH1tRfuU:KnR6yE41RrSowb0kQ0HWoA"
OTelCfg := `
endpoints:
- http://localhost:9200
force_attempt_http2: false
idle_conn_timeout: 3s
logs_index: some-index
retry:
Expand All @@ -111,12 +120,19 @@ retry:
max_interval: 1m0s
max_retries: 3
timeout: 1m30s
batcher:
sending_queue:
batch:
max_size: 1600
min_size: 0
sizer: items
block_on_overflow: true
enabled: true
max_size: 1600
min_size: 0
num_consumers: 1
queue_size: 3200
wait_for_result: true
mapping:
mode: bodymap
max_conns_per_host: 1
api_key: VGlOQUdHNEJhYU1kYUgxdFJmdVU6S25SNnlFNDFSclNvd2Iwa1EwSFdvQQ==
compression: gzip
compression_params:
Expand Down Expand Up @@ -154,6 +170,7 @@ logs_index: some-index
password: changeme
user: elastic
timeout: 1m30s
force_attempt_http2: false
mapping:
mode: bodymap
compression: gzip
Expand All @@ -169,20 +186,34 @@ compression_params:
presetName: "balanced",
output: commonOTelCfg + `
idle_conn_timeout: 3s
batcher:
max_conns_per_host: 1
sending_queue:
batch:
max_size: 1600
min_size: 0
sizer: items
block_on_overflow: true
enabled: true
max_size: 1600
min_size: 0
num_consumers: 1
queue_size: 3200
wait_for_result: true
`,
},
{
presetName: "throughput",
output: commonOTelCfg + `
idle_conn_timeout: 15s
batcher:
max_conns_per_host: 4
sending_queue:
batch:
max_size: 1600
min_size: 0
sizer: items
block_on_overflow: true
enabled: true
max_size: 1600
min_size: 0
num_consumers: 4
queue_size: 12800
wait_for_result: true
`,
},
{
Expand All @@ -197,13 +228,21 @@ retry:
max_retries: 3
logs_index: some-index
password: changeme
force_attempt_http2: false
user: elastic
timeout: 1m30s
idle_conn_timeout: 1s
batcher:
max_conns_per_host: 1
sending_queue:
batch:
max_size: 1600
min_size: 0
sizer: items
block_on_overflow: true
enabled: true
max_size: 1600
min_size: 0
num_consumers: 1
queue_size: 3200
wait_for_result: true
mapping:
mode: bodymap
compression: gzip
Expand All @@ -215,20 +254,34 @@ compression_params:
presetName: "latency",
output: commonOTelCfg + `
idle_conn_timeout: 1m0s
batcher:
max_conns_per_host: 1
sending_queue:
batch:
max_size: 50
min_size: 0
sizer: items
block_on_overflow: true
enabled: true
max_size: 50
min_size: 0
num_consumers: 1
queue_size: 4100
wait_for_result: true
`,
},
{
presetName: "custom",
output: commonOTelCfg + `
idle_conn_timeout: 3s
batcher:
max_conns_per_host: 1
sending_queue:
batch:
max_size: 1600
min_size: 0
sizer: items
block_on_overflow: true
enabled: true
max_size: 1600
min_size: 0
num_consumers: 1
queue_size: 3200
wait_for_result: true
`,
},
}
Expand Down Expand Up @@ -272,11 +325,19 @@ retry:
max_interval: 1m0s
max_retries: 3
timeout: 1m30s
force_attempt_http2: false
max_conns_per_host: 1
user: elastic
batcher:
sending_queue:
batch:
max_size: 1600
min_size: 0
sizer: items
block_on_overflow: true
enabled: true
max_size: 1600
min_size: 0
num_consumers: 1
queue_size: 3200
wait_for_result: true
mapping:
mode: bodymap
compression: gzip
Expand Down
4 changes: 3 additions & 1 deletion libbeat/publisher/queue/memqueue/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
c "github.com/elastic/elastic-agent-libs/config"
)

const DefaultEvents = 3200

type config struct {
Events int `config:"events" validate:"min=32"`
// This field is named MaxGetRequest because its logical effect is to give
Expand All @@ -36,7 +38,7 @@
}

var defaultConfig = config{
Events: 3200,
Events: DefaultEvents,
MaxGetRequest: 1600,
FlushTimeout: 10 * time.Second,
}
Expand All @@ -58,7 +60,7 @@
}
}
//nolint:gosimple // Actually want this conversion to be explicit since the types aren't definitionally equal.
return Settings{

Check failure on line 63 in libbeat/publisher/queue/memqueue/config.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

S1016: should convert config (type config) to Settings instead of using struct literal (staticcheck)
Events: config.Events,
MaxGetRequest: config.MaxGetRequest,
FlushTimeout: config.FlushTimeout,
Expand Down
16 changes: 12 additions & 4 deletions x-pack/filebeat/tests/integration/otel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,25 +637,33 @@ processors:
`
expectedExporter := `exporters:
elasticsearch:
batcher:
enabled: true
max_size: 1600
min_size: 0
compression: gzip
compression_params:
level: 1
endpoints:
- http://localhost:9200
force_attempt_http2: false
idle_conn_timeout: 3s
logs_index: index
mapping:
mode: bodymap
max_conns_per_host: 1
password: testing
retry:
enabled: true
initial_interval: 1s
max_interval: 1m0s
max_retries: 3
sending_queue:
batch:
max_size: 1600
min_size: 0
sizer: items
block_on_overflow: true
enabled: true
num_consumers: 1
queue_size: 3200
wait_for_result: true
timeout: 1m30s
user: admin`
expectedReceiver := `receivers:
Expand Down
Loading
Loading