Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions .chloggen/elasticsearchexporter-update-batcher-cfg-api.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: elasticsearchexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Update 'batcher' config to use internal struct instead of the one removed from the core.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [41225]

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
7 changes: 6 additions & 1 deletion exporter/elasticsearchexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ type Config struct {
// Batcher is unused by default, in which case Flush will be used.
// If Batcher.Enabled is non-nil (i.e. batcher::enabled is specified),
// then the Flush will be ignored even if Batcher.Enabled is false.
// TODO: Deprecate and remove this section in favor of sending_queue::batch.
Batcher BatcherConfig `mapstructure:"batcher"`
}

Expand All @@ -107,7 +108,11 @@ type Config struct {
// This is a slightly modified version of exporterbatcher.Config,
// to enable tri-state Enabled: unset, false, true.
type BatcherConfig struct {
exporterhelper.BatcherConfig `mapstructure:",squash"`
Enabled bool `mapstructure:"enabled"`
FlushTimeout time.Duration `mapstructure:"flush_timeout"`
Sizer exporterhelper.RequestSizerType `mapstructure:"sizer"`
MinSize int64 `mapstructure:"min_size"`
MaxSize int64 `mapstructure:"max_size"`

// enabledSet tracks whether Enabled has been specified.
// If enabledSet is false, the exporter will perform its
Expand Down
30 changes: 9 additions & 21 deletions exporter/elasticsearchexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,9 @@ func TestConfig(t *testing.T) {
DateFormat: "%Y.%m.%d",
},
Batcher: BatcherConfig{
BatcherConfig: exporterhelper.BatcherConfig{ //nolint:staticcheck
FlushTimeout: 30 * time.Second,
SizeConfig: exporterhelper.SizeConfig{ //nolint:staticcheck
Sizer: exporterhelper.RequestSizerTypeItems,
MinSize: defaultBatcherMinSizeItems,
},
},
FlushTimeout: 30 * time.Second,
Sizer: exporterhelper.RequestSizerTypeItems,
MinSize: defaultBatcherMinSizeItems,
},
TelemetrySettings: TelemetrySettings{
LogFailedDocsInputRateLimit: time.Second,
Expand Down Expand Up @@ -199,13 +195,9 @@ func TestConfig(t *testing.T) {
DateFormat: "%Y.%m.%d",
},
Batcher: BatcherConfig{
BatcherConfig: exporterhelper.BatcherConfig{ //nolint:staticcheck
FlushTimeout: 30 * time.Second,
SizeConfig: exporterhelper.SizeConfig{ //nolint:staticcheck
Sizer: exporterhelper.RequestSizerTypeItems,
MinSize: defaultBatcherMinSizeItems,
},
},
FlushTimeout: 30 * time.Second,
Sizer: exporterhelper.RequestSizerTypeItems,
MinSize: defaultBatcherMinSizeItems,
},
TelemetrySettings: TelemetrySettings{
LogFailedDocsInputRateLimit: time.Second,
Expand Down Expand Up @@ -279,13 +271,9 @@ func TestConfig(t *testing.T) {
DateFormat: "%Y.%m.%d",
},
Batcher: BatcherConfig{
BatcherConfig: exporterhelper.BatcherConfig{ //nolint:staticcheck
FlushTimeout: 30 * time.Second,
SizeConfig: exporterhelper.SizeConfig{ //nolint:staticcheck
Sizer: exporterhelper.RequestSizerTypeItems,
MinSize: defaultBatcherMinSizeItems,
},
},
FlushTimeout: 30 * time.Second,
Sizer: exporterhelper.RequestSizerTypeItems,
MinSize: defaultBatcherMinSizeItems,
},
TelemetrySettings: TelemetrySettings{
LogFailedDocsInputRateLimit: time.Second,
Expand Down
12 changes: 8 additions & 4 deletions exporter/elasticsearchexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -785,6 +785,9 @@ func TestExporterLogs(t *testing.T) {
})

t.Run("publish logs with dynamic id", func(t *testing.T) {
// TODO: remove this skip once the latest core dependency is pulled to support batch sizer
// Currently it's failing with "unknown sizer type"
t.Skip("skipping until the latest core dependency is pulled to support batch sizer")
Comment thread
andrzej-stencel marked this conversation as resolved.
t.Parallel()
exampleDocID := "abc123"
tableTests := []struct {
Expand Down Expand Up @@ -2457,12 +2460,13 @@ func TestExporterBatcher(t *testing.T) {
var requests []*http.Request
testauthID := component.NewID(component.MustNewType("authtest"))
exporter := newUnstartedTestLogsExporter(t, "http://testing.invalid", func(cfg *Config) {
batcherCfg := exporterhelper.NewDefaultBatcherConfig() //nolint:staticcheck
batcherCfg.Enabled = false
cfg.Batcher = BatcherConfig{
Enabled: false,
// sync bulk indexer is used without batching
BatcherConfig: batcherCfg,
enabledSet: true,
FlushTimeout: 200 * time.Millisecond,
Sizer: exporterhelper.RequestSizerTypeItems,
MinSize: 8192,
enabledSet: true,
}
cfg.Auth = configoptional.Some(configauth.Config{AuthenticatorID: testauthID})
cfg.Retry.Enabled = false
Expand Down
31 changes: 22 additions & 9 deletions exporter/elasticsearchexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,9 @@ func createDefaultConfig() component.Config {
},
IncludeSourceOnError: nil,
Batcher: BatcherConfig{
BatcherConfig: exporterhelper.BatcherConfig{ //nolint:staticcheck
FlushTimeout: 30 * time.Second,
SizeConfig: exporterhelper.SizeConfig{ //nolint:staticcheck
Sizer: exporterhelper.RequestSizerTypeItems,
MinSize: defaultBatcherMinSizeItems,
},
},
FlushTimeout: 30 * time.Second,
Sizer: exporterhelper.RequestSizerTypeItems,
MinSize: defaultBatcherMinSizeItems,
},
Flush: FlushSettings{
Bytes: 5e+6,
Expand Down Expand Up @@ -206,16 +202,33 @@ func exporterhelperOptions(
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
exporterhelper.WithStart(start),
exporterhelper.WithShutdown(shutdown),
exporterhelper.WithQueue(cfg.QueueSettings),
}
qs := cfg.QueueSettings
if cfg.Batcher.enabledSet {
opts = append(opts, exporterhelper.WithBatcher(cfg.Batcher.BatcherConfig)) //nolint:staticcheck
qs.Sizer = exporterhelper.RequestSizerTypeItems // TODO: Delete once core dependency updated.
if cfg.Batcher.Enabled {
Comment thread
dmitryax marked this conversation as resolved.
qs.Batch = &exporterhelper.BatchConfig{
FlushTimeout: cfg.Batcher.FlushTimeout,
MinSize: cfg.Batcher.MinSize,
MaxSize: cfg.Batcher.MaxSize,
// TODO: Uncomment once core dependency updated.
// Sizer: cfg.Batcher.Sizer,
}

// If the deprecated batcher is enabled without a queue, enable blocking queue to replicate the
// behavior of the deprecated batcher.
if !qs.Enabled {
qs.Enabled = true
qs.WaitForResult = true
}
}

// Effectively disable timeout_sender because timeout is enforced in bulk indexer.
//
// We keep timeout_sender enabled in the async mode (Batcher.Enabled == nil),
// to ensure sending data to the background workers will not block indefinitely.
opts = append(opts, exporterhelper.WithTimeout(exporterhelper.TimeoutConfig{Timeout: 0}))
}
opts = append(opts, exporterhelper.WithQueue(qs))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you do

	if err := qs.Validate(); err != nil {
		panic("invalid config " + err.Error())
	}

it is failing with

`batch` supports only `items` or `bytes` sizer`

which is what's failing integration tests. We did not validate our manually transformed config and this misconfiguration causes batcher to fail.

return opts
}