diff --git a/CHANGELOG.md b/CHANGELOG.md index 3dd896a074b..4957a64cb70 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Initialize map with `len(keys)` in `NewAllowKeysFilter` and `NewDenyKeysFilter` to avoid unnecessary allocations in `go.opentelemetry.io/otel/attribute`. (#6455) - `go.opentelemetry.io/otel/log/logtest` is now a separate Go module. (#6465) - `go.opentelemetry.io/otel/sdk/log/logtest` is now a separate Go module. (#6466) +- Improve performance of `BatchProcessor` in `go.opentelemetry.io/otel/sdk/log` by not exporting when exporter cannot accept more. (#6569) ### Deprecated diff --git a/sdk/log/batch.go b/sdk/log/batch.go index 28c969262b4..dd4c7a18c98 100644 --- a/sdk/log/batch.go +++ b/sdk/log/batch.go @@ -156,13 +156,18 @@ func (b *BatchProcessor) poll(interval time.Duration) (done chan struct{}) { global.Warn("dropped log records", "dropped", d) } - qLen := b.q.TryDequeue(buf, func(r []Record) bool { - ok := b.exporter.EnqueueExport(r) - if ok { - buf = slices.Clone(buf) - } - return ok - }) + qLen := b.q.Len() + // Don't copy data from queue unless exporter can accept more, it is very expensive. + if b.exporter.Ready() { + qLen = b.q.TryDequeue(buf, func(r []Record) bool { + ok := b.exporter.EnqueueExport(r) + if ok { + buf = slices.Clone(buf) + } + return ok + }) + } + if qLen >= b.batchSize { // There is another full batch ready. Immediately trigger // another export attempt. @@ -272,6 +277,13 @@ func newQueue(size int) *queue { } } +func (q *queue) Len() int { + q.Lock() + defer q.Unlock() + + return q.len +} + // Dropped returns the number of Records dropped during enqueueing since the // last time Dropped was called. func (q *queue) Dropped() uint64 { diff --git a/sdk/log/bench_test.go b/sdk/log/bench_test.go index 835f68c7aba..74689d7f387 100644 --- a/sdk/log/bench_test.go +++ b/sdk/log/bench_test.go @@ -13,6 +13,17 @@ import ( "github.com/stretchr/testify/assert" ) +type mockDelayExporter struct{} + +func (mockDelayExporter) Export(context.Context, []Record) error { + time.Sleep(time.Millisecond * 5) + return nil +} + +func (mockDelayExporter) Shutdown(context.Context) error { return nil } + +func (mockDelayExporter) ForceFlush(context.Context) error { return nil } + func BenchmarkProcessor(b *testing.B) { for _, tc := range []struct { name string @@ -30,6 +41,12 @@ func BenchmarkProcessor(b *testing.B) { return []LoggerProviderOption{WithProcessor(NewBatchProcessor(noopExporter{}))} }, }, + { + name: "BatchSimulateExport", + f: func() []LoggerProviderOption { + return []LoggerProviderOption{WithProcessor(NewBatchProcessor(mockDelayExporter{}))} + }, + }, { name: "SetTimestampSimple", f: func() []LoggerProviderOption { diff --git a/sdk/log/exporter.go b/sdk/log/exporter.go index e4e3c5402bf..8cef5dde6b5 100644 --- a/sdk/log/exporter.go +++ b/sdk/log/exporter.go @@ -186,11 +186,10 @@ type bufferExporter struct { // newBufferExporter returns a new bufferExporter that wraps exporter. The // returned bufferExporter will buffer at most size number of export requests. -// If size is less than zero, zero will be used (i.e. only synchronous -// exporting will be supported). +// If size is less than 1, 1 will be used. func newBufferExporter(exporter Exporter, size int) *bufferExporter { - if size < 0 { - size = 0 + if size < 1 { + size = 1 } input := make(chan exportData, size) return &bufferExporter{ @@ -201,6 +200,10 @@ func newBufferExporter(exporter Exporter, size int) *bufferExporter { } } +func (e *bufferExporter) Ready() bool { + return len(e.input) != cap(e.input) +} + var errStopped = errors.New("exporter stopped") func (e *bufferExporter) enqueue(ctx context.Context, records []Record, rCh chan<- error) error {