From 82fb9a505de89ec66bef870117e8374fc6141405 Mon Sep 17 00:00:00 2001 From: Paulin Todev Date: Wed, 10 Dec 2025 20:57:16 +0000 Subject: [PATCH] change the defaults of otelcol.exporter queue `batch` block BREAKING CHANGE: `min_size` has changed from `8192` to `2000` and `max_size` has changed from `0` to `3000` --- .../components/otelcol-queue-batch-block.md | 9 +- internal/component/otelcol/config_queue.go | 12 ++ .../loadbalancing/loadbalancing_test.go | 178 ++++++++++++++++++ .../otelcol/exporter/otlp/otlp_test.go | 75 ++++++++ .../exporter/otlphttp/otlphttp_test.go | 75 ++++++++ 5 files changed, 347 insertions(+), 2 deletions(-) diff --git a/docs/sources/shared/reference/components/otelcol-queue-batch-block.md b/docs/sources/shared/reference/components/otelcol-queue-batch-block.md index 952c9abb5e8..4bc8cfaa6ba 100644 --- a/docs/sources/shared/reference/components/otelcol-queue-batch-block.md +++ b/docs/sources/shared/reference/components/otelcol-queue-batch-block.md @@ -4,13 +4,18 @@ description: Shared content, otelcol queue batch block headless: true --- +Batching is disabled by default. +To enable it, explicitly include `batch {}` in your Alloy configuration. +You do not need to include a `batch {}` block in your `otelcol.exporter` if you already use a `otelcol.processor.batch` component, +although batching in the exporter is the prefered method because it is more flexible. + The following arguments are supported: | Name | Type | Description | Default | Required | | --------------- | ----------- | ---------------------------------------------------------------------------------------------------------- | ------------ | -------- | | `flush_timeout` | `duration` | Time after which a batch will be sent regardless of its size. Must be a non-zero value. | `"200ms"` | no | -| `min_size` | `number` | The minimum size of a batch. | `8192` | no | -| `max_size` | `number` | The maximum size of a batch, enables batch splitting. | | no | +| `min_size` | `number` | The minimum size of a batch. | `2000` | no | +| `max_size` | `number` | The maximum size of a batch, enables batch splitting. | `3000` | no | | `sizer` | `string` | How the queue and batching is measured. Overrides the sizer set at the `sending_queue` level for batching. | `"items"` | no | If configured, `max_size` must be greater than or equal to `min_size`. diff --git a/internal/component/otelcol/config_queue.go b/internal/component/otelcol/config_queue.go index 5c22526af2a..def7f5c4432 100644 --- a/internal/component/otelcol/config_queue.go +++ b/internal/component/otelcol/config_queue.go @@ -153,8 +153,20 @@ type BatchConfig struct { Sizer string `alloy:"sizer,attr,optional"` } +var _ syntax.Defaulter = (*BatchConfig)(nil) + var defaultBatchConfig = otelexporterhelper.NewDefaultQueueConfig().Batch +// SetToDefault implements syntax.Defaulter. +func (args *BatchConfig) SetToDefault() { + *args = BatchConfig{ + FlushTimeout: 200 * time.Millisecond, + MinSize: 2000, + MaxSize: 3000, + Sizer: "items", + } +} + // Validate returns an error if args is invalid. func (args *BatchConfig) Validate() error { if args == nil { diff --git a/internal/component/otelcol/exporter/loadbalancing/loadbalancing_test.go b/internal/component/otelcol/exporter/loadbalancing/loadbalancing_test.go index b5bfdb45b53..b2fdc96a42d 100644 --- a/internal/component/otelcol/exporter/loadbalancing/loadbalancing_test.go +++ b/internal/component/otelcol/exporter/loadbalancing/loadbalancing_test.go @@ -699,6 +699,184 @@ func TestConfigConversion(t *testing.T) { } } +func TestQueueBatchConfig(t *testing.T) { + tests := []struct { + testName string + alloyCfg string + expected otelcol.QueueArguments + }{ + { + testName: "default", + alloyCfg: ` + resolver { + static { + hostnames = ["endpoint-1"] + } + } + protocol { + otlp { + client {} + } + } + sending_queue { + batch {} + } + `, + expected: otelcol.QueueArguments{ + Enabled: true, + NumConsumers: 10, + QueueSize: 1000, + Sizer: "requests", + Batch: &otelcol.BatchConfig{ + FlushTimeout: 200 * time.Millisecond, + MinSize: 2000, + MaxSize: 3000, + Sizer: "items", + }, + }, + }, + { + testName: "explicit_batch", + alloyCfg: ` + resolver { + static { + hostnames = ["endpoint-1"] + } + } + protocol { + otlp { + client {} + } + } + sending_queue { + batch { + flush_timeout = "100ms" + min_size = 4096 + max_size = 16384 + sizer = "bytes" + } + } + `, + expected: otelcol.QueueArguments{ + Enabled: true, + NumConsumers: 10, + QueueSize: 1000, + Sizer: "requests", + Batch: &otelcol.BatchConfig{ + FlushTimeout: 100 * time.Millisecond, + MinSize: 4096, + MaxSize: 16384, + Sizer: "bytes", + }, + }, + }, + } + + for _, tc := range tests { + t.Run(tc.testName, func(t *testing.T) { + var args loadbalancing.Arguments + require.NoError(t, syntax.Unmarshal([]byte(tc.alloyCfg), &args)) + _, err := args.Convert() + require.NoError(t, err) + + require.Equal(t, tc.expected.Enabled, args.Queue.Enabled) + require.Equal(t, tc.expected.NumConsumers, args.Queue.NumConsumers) + require.Equal(t, tc.expected.QueueSize, args.Queue.QueueSize) + require.Equal(t, tc.expected.Sizer, args.Queue.Sizer) + require.Equal(t, tc.expected.Batch, args.Queue.Batch) + }) + } +} + +func TestProtocolQueueBatchConfig(t *testing.T) { + tests := []struct { + testName string + alloyCfg string + expected otelcol.QueueArguments + }{ + { + testName: "default", + alloyCfg: ` + resolver { + static { + hostnames = ["endpoint-1"] + } + } + protocol { + otlp { + client {} + queue { + batch {} + } + } + } + `, + expected: otelcol.QueueArguments{ + Enabled: true, + NumConsumers: 10, + QueueSize: 1000, + Sizer: "requests", + Batch: &otelcol.BatchConfig{ + FlushTimeout: 200 * time.Millisecond, + MinSize: 2000, + MaxSize: 3000, + Sizer: "items", + }, + }, + }, + { + testName: "explicit_batch", + alloyCfg: ` + resolver { + static { + hostnames = ["endpoint-1"] + } + } + protocol { + otlp { + client {} + queue { + batch { + flush_timeout = "100ms" + min_size = 4096 + max_size = 16384 + sizer = "bytes" + } + } + } + } + `, + expected: otelcol.QueueArguments{ + Enabled: true, + NumConsumers: 10, + QueueSize: 1000, + Sizer: "requests", + Batch: &otelcol.BatchConfig{ + FlushTimeout: 100 * time.Millisecond, + MinSize: 4096, + MaxSize: 16384, + Sizer: "bytes", + }, + }, + }, + } + + for _, tc := range tests { + t.Run(tc.testName, func(t *testing.T) { + var args loadbalancing.Arguments + require.NoError(t, syntax.Unmarshal([]byte(tc.alloyCfg), &args)) + _, err := args.Convert() + require.NoError(t, err) + + require.Equal(t, tc.expected.Enabled, args.Protocol.OTLP.Queue.Enabled) + require.Equal(t, tc.expected.NumConsumers, args.Protocol.OTLP.Queue.NumConsumers) + require.Equal(t, tc.expected.QueueSize, args.Protocol.OTLP.Queue.QueueSize) + require.Equal(t, tc.expected.Sizer, args.Protocol.OTLP.Queue.Sizer) + require.Equal(t, tc.expected.Batch, args.Protocol.OTLP.Queue.Batch) + }) + } +} + func TestDebugMetricsConfig(t *testing.T) { tests := []struct { testName string diff --git a/internal/component/otelcol/exporter/otlp/otlp_test.go b/internal/component/otelcol/exporter/otlp/otlp_test.go index 929d433b375..c7f72551c21 100644 --- a/internal/component/otelcol/exporter/otlp/otlp_test.go +++ b/internal/component/otelcol/exporter/otlp/otlp_test.go @@ -145,6 +145,81 @@ func createTestTraces() ptrace.Traces { return data } +func TestQueueBatchConfig(t *testing.T) { + tests := []struct { + testName string + alloyCfg string + expected otelcol.QueueArguments + }{ + { + testName: "default", + alloyCfg: ` + client { + endpoint = "tempo:4317" + } + sending_queue { + batch {} + } + `, + expected: otelcol.QueueArguments{ + Enabled: true, + NumConsumers: 10, + QueueSize: 1000, + Sizer: "requests", + Batch: &otelcol.BatchConfig{ + FlushTimeout: 200 * time.Millisecond, + MinSize: 2000, + MaxSize: 3000, + Sizer: "items", + }, + }, + }, + { + testName: "explicit_batch", + alloyCfg: ` + client { + endpoint = "tempo:4317" + } + sending_queue { + batch { + flush_timeout = "100ms" + min_size = 4096 + max_size = 16384 + sizer = "bytes" + } + } + `, + expected: otelcol.QueueArguments{ + Enabled: true, + NumConsumers: 10, + QueueSize: 1000, + Sizer: "requests", + Batch: &otelcol.BatchConfig{ + FlushTimeout: 100 * time.Millisecond, + MinSize: 4096, + MaxSize: 16384, + Sizer: "bytes", + }, + }, + }, + } + + for _, tc := range tests { + t.Run(tc.testName, func(t *testing.T) { + var args otlp.Arguments + require.NoError(t, syntax.Unmarshal([]byte(tc.alloyCfg), &args)) + _, err := args.Convert() + require.NoError(t, err) + + require.Equal(t, tc.expected.Enabled, args.Queue.Enabled) + require.Equal(t, tc.expected.NumConsumers, args.Queue.NumConsumers) + require.Equal(t, tc.expected.QueueSize, args.Queue.QueueSize) + require.Equal(t, tc.expected.Sizer, args.Queue.Sizer) + require.Equal(t, tc.expected.Batch, args.Queue.Batch) + }) + } +} + func TestDebugMetricsConfig(t *testing.T) { tests := []struct { testName string diff --git a/internal/component/otelcol/exporter/otlphttp/otlphttp_test.go b/internal/component/otelcol/exporter/otlphttp/otlphttp_test.go index 78ed69f983e..9d7454002d6 100644 --- a/internal/component/otelcol/exporter/otlphttp/otlphttp_test.go +++ b/internal/component/otelcol/exporter/otlphttp/otlphttp_test.go @@ -116,6 +116,81 @@ func createTestTraces() ptrace.Traces { return data } +func TestQueueBatchConfig(t *testing.T) { + tests := []struct { + testName string + alloyCfg string + expected otelcol.QueueArguments + }{ + { + testName: "default", + alloyCfg: ` + client { + endpoint = "http://tempo:4317" + } + sending_queue { + batch {} + } + `, + expected: otelcol.QueueArguments{ + Enabled: true, + NumConsumers: 10, + QueueSize: 1000, + Sizer: "requests", + Batch: &otelcol.BatchConfig{ + FlushTimeout: 200 * time.Millisecond, + MinSize: 2000, + MaxSize: 3000, + Sizer: "items", + }, + }, + }, + { + testName: "explicit_batch", + alloyCfg: ` + client { + endpoint = "http://tempo:4317" + } + sending_queue { + batch { + flush_timeout = "100ms" + min_size = 4096 + max_size = 16384 + sizer = "bytes" + } + } + `, + expected: otelcol.QueueArguments{ + Enabled: true, + NumConsumers: 10, + QueueSize: 1000, + Sizer: "requests", + Batch: &otelcol.BatchConfig{ + FlushTimeout: 100 * time.Millisecond, + MinSize: 4096, + MaxSize: 16384, + Sizer: "bytes", + }, + }, + }, + } + + for _, tc := range tests { + t.Run(tc.testName, func(t *testing.T) { + var args otlphttp.Arguments + require.NoError(t, syntax.Unmarshal([]byte(tc.alloyCfg), &args)) + _, err := args.Convert() + require.NoError(t, err) + + require.Equal(t, tc.expected.Enabled, args.Queue.Enabled) + require.Equal(t, tc.expected.NumConsumers, args.Queue.NumConsumers) + require.Equal(t, tc.expected.QueueSize, args.Queue.QueueSize) + require.Equal(t, tc.expected.Sizer, args.Queue.Sizer) + require.Equal(t, tc.expected.Batch, args.Queue.Batch) + }) + } +} + func TestDebugMetricsConfig(t *testing.T) { tests := []struct { testName string