Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,18 @@ description: Shared content, otelcol queue batch block
headless: true
---

Batching is disabled by default.
To enable it, explicitly include `batch {}` in your Alloy configuration.
You do not need to include a `batch {}` block in your `otelcol.exporter` if you already use a `otelcol.processor.batch` component,
although batching in the exporter is the prefered method because it is more flexible.

The following arguments are supported:

| Name | Type | Description | Default | Required |
| --------------- | ----------- | ---------------------------------------------------------------------------------------------------------- | ------------ | -------- |
| `flush_timeout` | `duration` | Time after which a batch will be sent regardless of its size. Must be a non-zero value. | `"200ms"` | no |
| `min_size` | `number` | The minimum size of a batch. | `8192` | no |
| `max_size` | `number` | The maximum size of a batch, enables batch splitting. | | no |
| `min_size` | `number` | The minimum size of a batch. | `2000` | no |
| `max_size` | `number` | The maximum size of a batch, enables batch splitting. | `3000` | no |
| `sizer` | `string` | How the queue and batching is measured. Overrides the sizer set at the `sending_queue` level for batching. | `"items"` | no |

If configured, `max_size` must be greater than or equal to `min_size`.
Expand Down
12 changes: 12 additions & 0 deletions internal/component/otelcol/config_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,20 @@ type BatchConfig struct {
Sizer string `alloy:"sizer,attr,optional"`
}

var _ syntax.Defaulter = (*BatchConfig)(nil)

var defaultBatchConfig = otelexporterhelper.NewDefaultQueueConfig().Batch

// SetToDefault implements syntax.Defaulter.
func (args *BatchConfig) SetToDefault() {
*args = BatchConfig{
FlushTimeout: 200 * time.Millisecond,
MinSize: 2000,
MaxSize: 3000,
Sizer: "items",
}
}

// Validate returns an error if args is invalid.
func (args *BatchConfig) Validate() error {
if args == nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,184 @@ func TestConfigConversion(t *testing.T) {
}
}

func TestQueueBatchConfig(t *testing.T) {
tests := []struct {
testName string
alloyCfg string
expected otelcol.QueueArguments
}{
{
testName: "default",
alloyCfg: `
resolver {
static {
hostnames = ["endpoint-1"]
}
}
protocol {
otlp {
client {}
}
}
sending_queue {
batch {}
}
`,
expected: otelcol.QueueArguments{
Enabled: true,
NumConsumers: 10,
QueueSize: 1000,
Sizer: "requests",
Batch: &otelcol.BatchConfig{
FlushTimeout: 200 * time.Millisecond,
MinSize: 2000,
MaxSize: 3000,
Sizer: "items",
},
},
},
{
testName: "explicit_batch",
alloyCfg: `
resolver {
static {
hostnames = ["endpoint-1"]
}
}
protocol {
otlp {
client {}
}
}
sending_queue {
batch {
flush_timeout = "100ms"
min_size = 4096
max_size = 16384
sizer = "bytes"
}
}
`,
expected: otelcol.QueueArguments{
Enabled: true,
NumConsumers: 10,
QueueSize: 1000,
Sizer: "requests",
Batch: &otelcol.BatchConfig{
FlushTimeout: 100 * time.Millisecond,
MinSize: 4096,
MaxSize: 16384,
Sizer: "bytes",
},
},
},
}

for _, tc := range tests {
t.Run(tc.testName, func(t *testing.T) {
var args loadbalancing.Arguments
require.NoError(t, syntax.Unmarshal([]byte(tc.alloyCfg), &args))
_, err := args.Convert()
require.NoError(t, err)

require.Equal(t, tc.expected.Enabled, args.Queue.Enabled)
require.Equal(t, tc.expected.NumConsumers, args.Queue.NumConsumers)
require.Equal(t, tc.expected.QueueSize, args.Queue.QueueSize)
require.Equal(t, tc.expected.Sizer, args.Queue.Sizer)
require.Equal(t, tc.expected.Batch, args.Queue.Batch)
})
}
}

func TestProtocolQueueBatchConfig(t *testing.T) {
tests := []struct {
testName string
alloyCfg string
expected otelcol.QueueArguments
}{
{
testName: "default",
alloyCfg: `
resolver {
static {
hostnames = ["endpoint-1"]
}
}
protocol {
otlp {
client {}
queue {
batch {}
}
}
}
`,
expected: otelcol.QueueArguments{
Enabled: true,
NumConsumers: 10,
QueueSize: 1000,
Sizer: "requests",
Batch: &otelcol.BatchConfig{
FlushTimeout: 200 * time.Millisecond,
MinSize: 2000,
MaxSize: 3000,
Sizer: "items",
},
},
},
{
testName: "explicit_batch",
alloyCfg: `
resolver {
static {
hostnames = ["endpoint-1"]
}
}
protocol {
otlp {
client {}
queue {
batch {
flush_timeout = "100ms"
min_size = 4096
max_size = 16384
sizer = "bytes"
}
}
}
}
`,
expected: otelcol.QueueArguments{
Enabled: true,
NumConsumers: 10,
QueueSize: 1000,
Sizer: "requests",
Batch: &otelcol.BatchConfig{
FlushTimeout: 100 * time.Millisecond,
MinSize: 4096,
MaxSize: 16384,
Sizer: "bytes",
},
},
},
}

for _, tc := range tests {
t.Run(tc.testName, func(t *testing.T) {
var args loadbalancing.Arguments
require.NoError(t, syntax.Unmarshal([]byte(tc.alloyCfg), &args))
_, err := args.Convert()
require.NoError(t, err)

require.Equal(t, tc.expected.Enabled, args.Protocol.OTLP.Queue.Enabled)
require.Equal(t, tc.expected.NumConsumers, args.Protocol.OTLP.Queue.NumConsumers)
require.Equal(t, tc.expected.QueueSize, args.Protocol.OTLP.Queue.QueueSize)
require.Equal(t, tc.expected.Sizer, args.Protocol.OTLP.Queue.Sizer)
require.Equal(t, tc.expected.Batch, args.Protocol.OTLP.Queue.Batch)
})
}
}

func TestDebugMetricsConfig(t *testing.T) {
tests := []struct {
testName string
Expand Down
75 changes: 75 additions & 0 deletions internal/component/otelcol/exporter/otlp/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,81 @@ func createTestTraces() ptrace.Traces {
return data
}

func TestQueueBatchConfig(t *testing.T) {
tests := []struct {
testName string
alloyCfg string
expected otelcol.QueueArguments
}{
{
testName: "default",
alloyCfg: `
client {
endpoint = "tempo:4317"
}
sending_queue {
batch {}
}
`,
expected: otelcol.QueueArguments{
Enabled: true,
NumConsumers: 10,
QueueSize: 1000,
Sizer: "requests",
Batch: &otelcol.BatchConfig{
FlushTimeout: 200 * time.Millisecond,
MinSize: 2000,
MaxSize: 3000,
Sizer: "items",
},
},
},
{
testName: "explicit_batch",
alloyCfg: `
client {
endpoint = "tempo:4317"
}
sending_queue {
batch {
flush_timeout = "100ms"
min_size = 4096
max_size = 16384
sizer = "bytes"
}
}
`,
expected: otelcol.QueueArguments{
Enabled: true,
NumConsumers: 10,
QueueSize: 1000,
Sizer: "requests",
Batch: &otelcol.BatchConfig{
FlushTimeout: 100 * time.Millisecond,
MinSize: 4096,
MaxSize: 16384,
Sizer: "bytes",
},
},
},
}

for _, tc := range tests {
t.Run(tc.testName, func(t *testing.T) {
var args otlp.Arguments
require.NoError(t, syntax.Unmarshal([]byte(tc.alloyCfg), &args))
_, err := args.Convert()
require.NoError(t, err)

require.Equal(t, tc.expected.Enabled, args.Queue.Enabled)
require.Equal(t, tc.expected.NumConsumers, args.Queue.NumConsumers)
require.Equal(t, tc.expected.QueueSize, args.Queue.QueueSize)
require.Equal(t, tc.expected.Sizer, args.Queue.Sizer)
require.Equal(t, tc.expected.Batch, args.Queue.Batch)
})
}
}

func TestDebugMetricsConfig(t *testing.T) {
tests := []struct {
testName string
Expand Down
75 changes: 75 additions & 0 deletions internal/component/otelcol/exporter/otlphttp/otlphttp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,81 @@ func createTestTraces() ptrace.Traces {
return data
}

func TestQueueBatchConfig(t *testing.T) {
tests := []struct {
testName string
alloyCfg string
expected otelcol.QueueArguments
}{
{
testName: "default",
alloyCfg: `
client {
endpoint = "http://tempo:4317"
}
sending_queue {
batch {}
}
`,
expected: otelcol.QueueArguments{
Enabled: true,
NumConsumers: 10,
QueueSize: 1000,
Sizer: "requests",
Batch: &otelcol.BatchConfig{
FlushTimeout: 200 * time.Millisecond,
MinSize: 2000,
MaxSize: 3000,
Sizer: "items",
},
},
},
{
testName: "explicit_batch",
alloyCfg: `
client {
endpoint = "http://tempo:4317"
}
sending_queue {
batch {
flush_timeout = "100ms"
min_size = 4096
max_size = 16384
sizer = "bytes"
}
}
`,
expected: otelcol.QueueArguments{
Enabled: true,
NumConsumers: 10,
QueueSize: 1000,
Sizer: "requests",
Batch: &otelcol.BatchConfig{
FlushTimeout: 100 * time.Millisecond,
MinSize: 4096,
MaxSize: 16384,
Sizer: "bytes",
},
},
},
}

for _, tc := range tests {
t.Run(tc.testName, func(t *testing.T) {
var args otlphttp.Arguments
require.NoError(t, syntax.Unmarshal([]byte(tc.alloyCfg), &args))
_, err := args.Convert()
require.NoError(t, err)

require.Equal(t, tc.expected.Enabled, args.Queue.Enabled)
require.Equal(t, tc.expected.NumConsumers, args.Queue.NumConsumers)
require.Equal(t, tc.expected.QueueSize, args.Queue.QueueSize)
require.Equal(t, tc.expected.Sizer, args.Queue.Sizer)
require.Equal(t, tc.expected.Batch, args.Queue.Batch)
})
}
}

func TestDebugMetricsConfig(t *testing.T) {
tests := []struct {
testName string
Expand Down
Loading