Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18,334 changes: 11,453 additions & 6,881 deletions NOTICE.txt

Large diffs are not rendered by default.

385 changes: 202 additions & 183 deletions go.mod

Large diffs are not rendered by default.

882 changes: 465 additions & 417 deletions go.sum

Large diffs are not rendered by default.

28 changes: 20 additions & 8 deletions libbeat/otelbeat/beatconverter/beatconverter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ exporters:
- https://localhost:9200
idle_conn_timeout: 3s
logs_index: form-otel-exporter
num_workers: 1
password: changeme
retry:
enabled: true
Expand All @@ -44,10 +43,17 @@ exporters:
max_retries: 3
user: elastic
timeout: 1m30s
batcher:
max_conns_per_host: 1
sending_queue:
batch:
max_size: 1600
min_size: 0
sizer: items
block_on_overflow: true
enabled: true
max_size: 1600
min_size: 0
num_consumers: 1
queue_size: 3200
wait_for_result: true
mapping:
mode: bodymap
compression: gzip
Expand Down Expand Up @@ -180,7 +186,6 @@ exporters:
- https://es-hostname.elastic.co:443
idle_conn_timeout: 3s
logs_index: form-otel-exporter
num_workers: 1
password: password
retry:
enabled: true
Expand All @@ -189,10 +194,17 @@ exporters:
max_retries: 3
user: elastic-cloud
timeout: 1m30s
batcher:
max_conns_per_host: 1
sending_queue:
batch:
max_size: 1600
min_size: 0
sizer: items
block_on_overflow: true
enabled: true
max_size: 1600
min_size: 0
num_consumers: 1
queue_size: 3200
wait_for_result: true
mapping:
mode: bodymap
compression: gzip
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,12 @@ import (
"reflect"
"strings"

"github.com/go-viper/mapstructure/v2"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/transport/kerberos"
oteltranslate "github.com/elastic/beats/v7/libbeat/otelbeat/oteltranslate"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/beats/v7/libbeat/outputs/elasticsearch"
"github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
)
Expand Down Expand Up @@ -67,6 +65,9 @@ var defaultOptions = esToOTelOptions{
Pipeline: "",
ProxyURL: "",
Preset: "custom", // default is custom if not set
HostWorkerCfg: outputs.HostWorkerCfg{
Workers: 1,
},
}

// ToOTelConfig converts a Beat config into OTel elasticsearch exporter config
Expand Down Expand Up @@ -124,34 +125,40 @@ func ToOTelConfig(output *config.C) (map[string]any, error) {
return nil, fmt.Errorf("cannot convert SSL config into OTel: %w", err)
}

// get number of workers
workers := 1 // Default value is 1
if escfg.NumWorkers() > 1 {
workers = escfg.NumWorkers()
}
otelYAMLCfg := map[string]any{
"logs_index": escfg.Index, // index
"endpoints": hosts, // hosts, protocol, path, port
"num_workers": workers, // worker/workers
"logs_index": escfg.Index, // index
"endpoints": hosts, // hosts, protocol, path, port

// ClientConfig
"timeout": escfg.Transport.Timeout, // timeout
"idle_conn_timeout": escfg.Transport.IdleConnTimeout, // idle_connection_timeout

// max_conns_per_host is a "hard" limit on number of open connections.
// Ideally, escfg.NumWorkers() should map to num_consumer, but we had a bug in upstream
// where it could spin as many goroutines as it liked.
// Given that batcher implementation can change and it has a history of such changes,
// let's keep max_conns_per_host setting for now and remove it once exporterhelper is stable.
"max_conns_per_host": escfg.NumWorkers(),

// Retry
"retry": map[string]any{
"enabled": true,
"initial_interval": escfg.Backoff.Init, // backoff.init
"max_interval": escfg.Backoff.Max, // backoff.max
"max_retries": escfg.MaxRetries, // max_retries

},

// Batcher is experimental
"batcher": map[string]any{
"enabled": true,
"max_size": escfg.BulkMaxSize, // bulk_max_size
"min_size": 0, // 0 means immediately trigger a flush
"sending_queue": map[string]any{
"batch": map[string]any{
"max_size": escfg.BulkMaxSize, // bulk_max_size
"min_size": 0, // 0 means immediately trigger a flush
"sizer": "items",
},
"enabled": true,
"queue_size": getQueueSize(output),
"block_on_overflow": true,
"wait_for_result": true,
"num_consumers": escfg.NumWorkers(),
},

"mapping": map[string]any{
Expand All @@ -178,31 +185,9 @@ func ToOTelConfig(output *config.C) (map[string]any, error) {
setIfNotNil(otelYAMLCfg, "proxy_url", escfg.ProxyURL) // proxy_url
setIfNotNil(otelYAMLCfg, "pipeline", escfg.Pipeline) // pipeline

if err := typeSafetyCheck(otelYAMLCfg); err != nil {
return nil, err
}

return otelYAMLCfg, nil
}

// For type safety check
func typeSafetyCheck(value map[string]any) error {
// the value should match `elasticsearchexporter.Config` type.
// it throws an error if non existing key names are set
var result elasticsearchexporter.Config
d, _ := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
Squash: true,
Result: &result,
ErrorUnused: true,
})

err := d.Decode(value)
if err != nil {
return err
}
return err
}

// Helper function to check if a struct is empty
func isStructEmpty(s any) bool {
return reflect.DeepEqual(s, reflect.Zero(reflect.TypeOf(s)).Interface())
Expand Down Expand Up @@ -233,3 +218,11 @@ func setIfNotNil(m map[string]any, key string, value any) {
m[key] = value
}
}

func getQueueSize(output *config.C) int {
size, err := output.Int("queue.mem.events", -1)
if err != nil {
return memqueue.DefaultEvents // return default queue.mem.events for sending_queue in case of an errr
}
return int(size)
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ endpoints:
- http://localhost:9300/foo/bar
idle_conn_timeout: 3s
logs_index: some-index
num_workers: 30
max_conns_per_host: 30
password: changeme
pipeline: some-ingest-pipeline
proxy_url: https://proxy.url
Expand All @@ -69,15 +69,21 @@ retry:
initial_interval: 42s
max_interval: 7m0s
max_retries: 3
sending_queue:
batch:
max_size: 1600
min_size: 0
sizer: items
block_on_overflow: true
enabled: true
num_consumers: 30
queue_size: 3200
wait_for_result: true
timeout: 1m30s
user: elastic
headers:
X-Header-1: foo
X-Bar-Header: bar
batcher:
enabled: true
max_size: 1600
min_size: 0
mapping:
mode: bodymap
compression: gzip
Expand Down Expand Up @@ -106,19 +112,25 @@ endpoints:
- http://localhost:9200
idle_conn_timeout: 3s
logs_index: some-index
num_workers: 1
retry:
enabled: true
initial_interval: 1s
max_interval: 1m0s
max_retries: 3
timeout: 1m30s
batcher:
sending_queue:
batch:
max_size: 1600
min_size: 0
sizer: items
block_on_overflow: true
enabled: true
max_size: 1600
min_size: 0
num_consumers: 1
queue_size: 3200
wait_for_result: true
mapping:
mode: bodymap
max_conns_per_host: 1
api_key: VGlOQUdHNEJhYU1kYUgxdFJmdVU6S25SNnlFNDFSclNvd2Iwa1EwSFdvQQ==
compression: gzip
compression_params:
Expand Down Expand Up @@ -172,22 +184,34 @@ compression_params:
presetName: "balanced",
output: commonOTelCfg + `
idle_conn_timeout: 3s
num_workers: 1
batcher:
max_conns_per_host: 1
sending_queue:
batch:
max_size: 1600
min_size: 0
sizer: items
block_on_overflow: true
enabled: true
max_size: 1600
min_size: 0
num_consumers: 1
queue_size: 3200
wait_for_result: true
`,
},
{
presetName: "throughput",
output: commonOTelCfg + `
idle_conn_timeout: 15s
num_workers: 4
batcher:
max_conns_per_host: 4
sending_queue:
batch:
max_size: 1600
min_size: 0
sizer: items
block_on_overflow: true
enabled: true
max_size: 1600
min_size: 0
num_consumers: 4
queue_size: 12800
wait_for_result: true
`,
},
{
Expand All @@ -205,11 +229,17 @@ password: changeme
user: elastic
timeout: 1m30s
idle_conn_timeout: 1s
num_workers: 1
batcher:
max_conns_per_host: 1
sending_queue:
batch:
max_size: 1600
min_size: 0
sizer: items
block_on_overflow: true
enabled: true
max_size: 1600
min_size: 0
num_consumers: 1
queue_size: 3200
wait_for_result: true
mapping:
mode: bodymap
compression: gzip
Expand All @@ -221,22 +251,34 @@ compression_params:
presetName: "latency",
output: commonOTelCfg + `
idle_conn_timeout: 1m0s
num_workers: 1
batcher:
max_conns_per_host: 1
sending_queue:
batch:
max_size: 50
min_size: 0
sizer: items
block_on_overflow: true
enabled: true
max_size: 50
min_size: 0
num_consumers: 1
queue_size: 4100
wait_for_result: true
`,
},
{
presetName: "custom",
output: commonOTelCfg + `
idle_conn_timeout: 3s
num_workers: 1
batcher:
max_conns_per_host: 1
sending_queue:
batch:
max_size: 1600
min_size: 0
sizer: items
block_on_overflow: true
enabled: true
max_size: 1600
min_size: 0
num_consumers: 1
queue_size: 3200
wait_for_result: true
`,
},
}
Expand Down Expand Up @@ -275,18 +317,24 @@ endpoints:
idle_conn_timeout: 3s
logs_index: some-index
password: changeme
num_workers: 1
retry:
enabled: true
initial_interval: 1s
max_interval: 1m0s
max_retries: 3
timeout: 1m30s
max_conns_per_host: 1
user: elastic
batcher:
sending_queue:
batch:
max_size: 1600
min_size: 0
sizer: items
block_on_overflow: true
enabled: true
max_size: 1600
min_size: 0
num_consumers: 1
queue_size: 3200
wait_for_result: true
mapping:
mode: bodymap
{{ if gt . 0 }}
Expand Down
Loading
Loading