Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
51e7d42
fix(loadbalancingexporter): batch logs after routing
amir-jakoby Mar 19, 2026
88e7238
docs(changelog): add saw-6744 release note
amir-jakoby Mar 19, 2026
9d29002
fix(loadbalancingexporter): synchronize backend exporter updates
amir-jakoby Mar 19, 2026
a4fe5ce
fix(loadbalancingexporter): avoid stale backend enqueues
amir-jakoby Mar 19, 2026
098c545
fix(loadbalancingexporter): handle batched log backpressure
amir-jakoby Mar 19, 2026
ac13582
fix(loadbalancingexporter): satisfy batcher lint checks
amir-jakoby Mar 19, 2026
54e5df2
chore: refresh tidylist
amir-jakoby Mar 19, 2026
a28df80
fix(loadbalancingexporter): narrow batcher removal critical sections
amir-jakoby Mar 19, 2026
ff8eb14
chore: refresh go module tidy state
amir-jakoby Mar 19, 2026
7435dac
fix(loadbalancingexporter): drain inflight logs on shutdown
amir-jakoby Mar 19, 2026
68d17aa
fix(loadbalancingexporter): wire queue payload codec via queue batch …
amir-jakoby Mar 19, 2026
aa10b55
perf(loadbalancingexporter): avoid O(n²) proto re-serialization in lo…
amir-jakoby Mar 19, 2026
6c33b51
fix(loadbalancingexporter): honor shutdown context while draining
amir-jakoby Mar 19, 2026
d8ab285
fix(loadbalancingexporter): preserve queue compression semantics
amir-jakoby Mar 19, 2026
d9b0f89
fix(loadbalancingexporter): remove dead queue codec wrapper
amir-jakoby Mar 19, 2026
9cce6e7
fix(loadbalancingexporter): restore queue payload codec support
amir-jakoby Mar 19, 2026
67fe987
fix(loadbalancingexporter): harden exporter shutdown races
amir-jakoby Mar 19, 2026
667c684
fix(loadbalancingexporter): deduplicate resource/scope in per-endpoin…
sawmills-architect-review[bot] Mar 19, 2026
d67af0e
fix(loadbalancingexporter): restore direct consume semantics
amir-jakoby Mar 19, 2026
f02c544
fix(loadbalancingexporter): surface stopping-enqueue failures
amir-jakoby Mar 19, 2026
e9c7944
fix(loadbalancingexporter): avoid duplicate enqueue errors
amir-jakoby Mar 19, 2026
000f5ee
fix(loadbalancingexporter): finish CI sweep follow-ups
amir-jakoby Mar 19, 2026
bcd8e2d
fix(loadbalancingexporter): hoist CompressInMemory unsupported check …
amir-jakoby Mar 19, 2026
6b936fe
fix(loadbalancingexporter): address follow-up review comments
amir-jakoby Mar 19, 2026
e77a91f
fix(hotreloadprocessor): clear package lint failures
amir-jakoby Mar 19, 2026
e216623
fix(ci): normalize import formatting
amir-jakoby Mar 19, 2026
7512d05
fix(logstometricsprocessor): clear lint debt
amir-jakoby Mar 19, 2026
4c8d398
fix(loadbalancingexporter): preserve log batching semantics
amir-jakoby Mar 19, 2026
482352f
build(loadbalancingexporter): tidy module deps
amir-jakoby Mar 19, 2026
102660e
perf(loadbalancingexporter): reduce log batching hot-path work
amir-jakoby Mar 19, 2026
d33293a
build(distributions): regenerate contrib report
amir-jakoby Mar 19, 2026
a63a280
fix(loadbalancingexporter): bound batched enqueue coalescing
amir-jakoby Mar 19, 2026
8fbcbf1
fix(hotreloadprocessor): sort generated metadata
amir-jakoby Mar 19, 2026
7e1a51a
refactor(loadbalancingexporter): drop redundant copied fields
amir-jakoby Mar 19, 2026
b9ac902
fix(loadbalancingexporter): reject log sends during shutdown
amir-jakoby Mar 19, 2026
e5bc61a
build(hotreloadprocessor): tidy generated test deps
amir-jakoby Mar 19, 2026
274e77c
build(logstometricsprocessor): regenerate metadata outputs
amir-jakoby Mar 19, 2026
b082915
build: add hotreloadprocessor to module sets
amir-jakoby Mar 19, 2026
3f4e15a
build: regenerate issue templates
amir-jakoby Mar 19, 2026
824555f
fix(loadbalancingexporter): stop batcher backend creation after shutdown
amir-jakoby Mar 19, 2026
504e89f
refactor(loadbalancingexporter): inline exporter lookup helper
amir-jakoby Mar 19, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .chloggen/saw-6744-loadbalancing-log-batcher.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
change_type: enhancement
component: exporter/loadbalancing
note: Add an optional post-routing log batcher to the loadbalancing exporter with per-backend buffering and configurable flush limits.
issues: [6744]
subtext: |
The new `log_batcher` config is disabled by default for backward compatibility.
When enabled, logs are batched per resolved backend before downstream queue compression.
change_logs: [user]
8 changes: 8 additions & 0 deletions .codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,10 @@ component_management:
name: processor_groupbytrace
paths:
- processor/groupbytraceprocessor/**
- component_id: processor_hotreload
name: processor_hotreload
paths:
- processor/hotreloadprocessor/**
- component_id: processor_interval
name: processor_interval
paths:
Expand All @@ -496,6 +500,10 @@ component_management:
name: processor_logdedup
paths:
- processor/logdedupprocessor/**
- component_id: processor_logstometrics
name: processor_logstometrics
paths:
- processor/logstometricsprocessor/**
- component_id: processor_logstransform
name: processor_logstransform
paths:
Expand Down
2 changes: 2 additions & 0 deletions .github/ISSUE_TEMPLATE/beta_stability.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,12 @@ body:
- processor/geoip
- processor/groupbyattrs
- processor/groupbytrace
- processor/hotreload
- processor/interval
- processor/isolationforest
- processor/k8sattributes
- processor/logdedup
- processor/logstometrics
- processor/logstransform
- processor/metricsgeneration
- processor/metricstarttime
Expand Down
2 changes: 2 additions & 0 deletions .github/ISSUE_TEMPLATE/bug_report.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -201,10 +201,12 @@ body:
- processor/geoip
- processor/groupbyattrs
- processor/groupbytrace
- processor/hotreload
- processor/interval
- processor/isolationforest
- processor/k8sattributes
- processor/logdedup
- processor/logstometrics
- processor/logstransform
- processor/metricsgeneration
- processor/metricstarttime
Expand Down
2 changes: 2 additions & 0 deletions .github/ISSUE_TEMPLATE/feature_request.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -195,10 +195,12 @@ body:
- processor/geoip
- processor/groupbyattrs
- processor/groupbytrace
- processor/hotreload
- processor/interval
- processor/isolationforest
- processor/k8sattributes
- processor/logdedup
- processor/logstometrics
- processor/logstransform
- processor/metricsgeneration
- processor/metricstarttime
Expand Down
2 changes: 2 additions & 0 deletions .github/ISSUE_TEMPLATE/other.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -195,10 +195,12 @@ body:
- processor/geoip
- processor/groupbyattrs
- processor/groupbytrace
- processor/hotreload
- processor/interval
- processor/isolationforest
- processor/k8sattributes
- processor/logdedup
- processor/logstometrics
- processor/logstransform
- processor/metricsgeneration
- processor/metricstarttime
Expand Down
2 changes: 2 additions & 0 deletions .github/ISSUE_TEMPLATE/unmaintained.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,12 @@ body:
- processor/geoip
- processor/groupbyattrs
- processor/groupbytrace
- processor/hotreload
- processor/interval
- processor/isolationforest
- processor/k8sattributes
- processor/logdedup
- processor/logstometrics
- processor/logstransform
- processor/metricsgeneration
- processor/metricstarttime
Expand Down
4 changes: 2 additions & 2 deletions cmd/golden/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions connector/otlpjsonconnector/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions connector/servicegraphconnector/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 13 additions & 2 deletions exporter/loadbalancingexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ Refer to [config.yaml](./testdata/config.yaml) for detailed examples on using th
* `streamID`: Routes metrics based on their datapoint streamID. That's the unique hash of all it's attributes, plus the attributes and identifying information of its resource, scope, and metric data
* loadbalancing exporter supports set of standard [queuing, retry and timeout settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md), but they are disable by default to maintain compatibility
* The `routing_attributes` property is used to list the attributes that should be used if the `routing_key` is `attributes`.
* The `log_batcher` property enables post-routing log batching per backend. It is `disabled` by default for backward compatibility.
* `enabled` turns post-routing log batching on or off.
* `max_records` flushes a backend batch when it reaches this many log records. Default: `512`.
* `max_bytes` flushes a backend batch when its serialized OTLP payload size before compression reaches this many bytes. Default: `1048576` (`1 MiB`).
* `flush_interval` flushes a backend batch after this interval even if size limits are not reached. Default: `100ms`.

Simple example

Expand All @@ -138,6 +143,11 @@ processors:

exporters:
loadbalancing:
log_batcher:
enabled: true
max_records: 512
max_bytes: 1048576
flush_interval: 100ms
routing_key: "service"
protocol:
otlp:
Expand Down Expand Up @@ -236,17 +246,18 @@ service:
- loadbalancing
```

To compress payloads in an in-memory sending queue, set:
To compress queued payloads before they are written to persistent queue storage, set:

```yaml
exporters:
loadbalancing:
sending_queue:
enabled: true
payload_compression: zstd
compress_in_memory: true
```

`sending_queue.compress_in_memory` is currently not supported by this exporter helper version. If set, config validation fails instead of silently ignoring it.

Kubernetes resolver example (For a more specific example: [example/k8s-resolver](./example/k8s-resolver/README.md))
> [!IMPORTANT]
> The k8s resolver requires proper permissions. See [the full example](./example/k8s-resolver/README.md) for more information.
Expand Down
38 changes: 31 additions & 7 deletions exporter/loadbalancingexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ const (
type Config struct {
TimeoutSettings exporterhelper.TimeoutConfig `mapstructure:",squash"`
configretry.BackOffConfig `mapstructure:"retry_on_failure"`
QueueSettings QueueSettings `mapstructure:"sending_queue"`
QueueSettings QueueSettings `mapstructure:"sending_queue"`
LogBatcher LogBatcherConfig `mapstructure:"log_batcher"`
Comment thread
amir-jakoby marked this conversation as resolved.

Protocol Protocol `mapstructure:"protocol"`
Resolver ResolverSettings `mapstructure:"resolver"`
Expand All @@ -61,6 +62,13 @@ type QueueSettings struct {
CompressInMemory bool `mapstructure:"compress_in_memory"`
}

type LogBatcherConfig struct {
Enabled bool `mapstructure:"enabled"`
MaxRecords int `mapstructure:"max_records"`
MaxBytes int `mapstructure:"max_bytes"`
FlushInterval time.Duration `mapstructure:"flush_interval"`
}

func (q *QueueSettings) Unmarshal(conf *confmap.Conf) error {
if conf == nil {
return nil
Expand Down Expand Up @@ -117,18 +125,34 @@ func (q QueueSettings) Validate() error {
return fmt.Errorf("sending_queue.payload_compression must be one of [none, snappy, zstd], found %q", q.PayloadCompression)
}

if q.CompressInMemory && !q.Enabled {
return errors.New("sending_queue.compress_in_memory requires sending_queue.enabled=true")
}
if q.CompressInMemory && (q.PayloadCompression == "" || q.PayloadCompression == QueuePayloadCompressionNone) {
return errors.New("sending_queue.compress_in_memory requires sending_queue.payload_compression to be set to snappy or zstd")
if q.CompressInMemory {
return errors.New("sending_queue.compress_in_memory is not supported by this exporter helper version; use sending_queue.payload_compression instead")
}
Comment thread
amir-jakoby marked this conversation as resolved.

return nil
}

func (cfg *Config) Validate() error {
return cfg.QueueSettings.Validate()
if err := cfg.QueueSettings.Validate(); err != nil {
return err
}
return cfg.LogBatcher.Validate()
}

func (c LogBatcherConfig) Validate() error {
if !c.Enabled {
return nil
}
if c.MaxRecords <= 0 {
return errors.New("log_batcher.max_records must be greater than 0 when log_batcher.enabled=true")
}
if c.MaxBytes <= 0 {
return errors.New("log_batcher.max_bytes must be greater than 0 when log_batcher.enabled=true")
}
if c.FlushInterval <= 0 {
return errors.New("log_batcher.flush_interval must be greater than 0 when log_batcher.enabled=true")
}
return nil
}

// Protocol holds the individual protocol-specific settings. Only OTLP is supported at the moment.
Expand Down
58 changes: 55 additions & 3 deletions exporter/loadbalancingexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package loadbalancingexporter
import (
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -43,14 +44,34 @@ func TestConfigValidatePayloadCompression(t *testing.T) {
}

func TestConfigValidateCompressInMemory(t *testing.T) {
// compress_in_memory is unsupported; any use must fail immediately regardless of other settings.
cfg := createDefaultConfig().(*Config)
cfg.QueueSettings.CompressInMemory = true
require.ErrorContains(t, cfg.Validate(), "sending_queue.compress_in_memory requires sending_queue.enabled=true")
require.ErrorContains(t, cfg.Validate(), "sending_queue.compress_in_memory is not supported")

// Same error even when enabled=true and payload_compression is set.
cfg.QueueSettings.Enabled = true
require.ErrorContains(t, cfg.Validate(), "sending_queue.compress_in_memory requires sending_queue.payload_compression")

cfg.QueueSettings.PayloadCompression = QueuePayloadCompressionSnappy
require.ErrorContains(t, cfg.Validate(), "sending_queue.compress_in_memory is not supported")
}

func TestConfigValidateLogBatcher(t *testing.T) {
cfg := createDefaultConfig().(*Config)
require.NoError(t, cfg.Validate())

cfg.LogBatcher.Enabled = true
cfg.LogBatcher.MaxRecords = 0
require.ErrorContains(t, cfg.Validate(), "log_batcher.max_records")

cfg.LogBatcher.MaxRecords = 10
cfg.LogBatcher.MaxBytes = 0
require.ErrorContains(t, cfg.Validate(), "log_batcher.max_bytes")

cfg.LogBatcher.MaxBytes = 1024
cfg.LogBatcher.FlushInterval = 0
require.ErrorContains(t, cfg.Validate(), "log_batcher.flush_interval")

cfg.LogBatcher.FlushInterval = time.Second
require.NoError(t, cfg.Validate())
}

Expand Down Expand Up @@ -83,3 +104,34 @@ func TestLoadConfigWithQueueCompression(t *testing.T) {
require.Equal(t, QueuePayloadCompressionZstd, cfg.QueueSettings.PayloadCompression)
require.True(t, cfg.QueueSettings.CompressInMemory)
}

func TestLoadConfigWithLogBatcher(t *testing.T) {
cfg := createDefaultConfig().(*Config)
conf := confmap.NewFromStringMap(map[string]any{
"protocol": map[string]any{
"otlp": map[string]any{
"endpoint": "localhost:4317",
"tls": map[string]any{
"insecure": true,
},
},
},
"resolver": map[string]any{
"static": map[string]any{
"hostnames": []string{"localhost:4317"},
},
},
"log_batcher": map[string]any{
"enabled": true,
"max_records": 1024,
"max_bytes": 2097152,
"flush_interval": "250ms",
},
})

require.NoError(t, conf.Unmarshal(cfg))
require.True(t, cfg.LogBatcher.Enabled)
require.Equal(t, 1024, cfg.LogBatcher.MaxRecords)
require.Equal(t, 2097152, cfg.LogBatcher.MaxBytes)
require.Equal(t, 250*time.Millisecond, cfg.LogBatcher.FlushInterval)
}
Loading
Loading