From 8e9863a5b7d2a5dabd7918900d80193da95cf662 Mon Sep 17 00:00:00 2001 From: wmdanor Date: Fri, 28 Mar 2025 01:44:33 +0000 Subject: [PATCH 01/10] why --- sdk/log/batch.go | 25 ++++++++++++++++++------- sdk/log/bench_test.go | 17 +++++++++++++++++ sdk/log/exporter.go | 8 ++++++-- 3 files changed, 41 insertions(+), 9 deletions(-) diff --git a/sdk/log/batch.go b/sdk/log/batch.go index 28c969262b4..33373e4b706 100644 --- a/sdk/log/batch.go +++ b/sdk/log/batch.go @@ -156,13 +156,17 @@ func (b *BatchProcessor) poll(interval time.Duration) (done chan struct{}) { global.Warn("dropped log records", "dropped", d) } - qLen := b.q.TryDequeue(buf, func(r []Record) bool { - ok := b.exporter.EnqueueExport(r) - if ok { - buf = slices.Clone(buf) - } - return ok - }) + qLen := b.q.Len() + // don't copy data from queue unless exporter can accept more, it is very expensive + if !b.exporter.IsQueueFull() { + qLen = b.q.TryDequeue(buf, func(r []Record) bool { + ok := b.exporter.EnqueueExport(r) + if ok { + buf = slices.Clone(buf) + } + return ok + }) + } if qLen >= b.batchSize { // There is another full batch ready. Immediately trigger // another export attempt. @@ -272,6 +276,13 @@ func newQueue(size int) *queue { } } +func (q *queue) Len() int { + q.Lock() + defer q.Unlock() + + return q.len +} + // Dropped returns the number of Records dropped during enqueueing since the // last time Dropped was called. func (q *queue) Dropped() uint64 { diff --git a/sdk/log/bench_test.go b/sdk/log/bench_test.go index 835f68c7aba..74689d7f387 100644 --- a/sdk/log/bench_test.go +++ b/sdk/log/bench_test.go @@ -13,6 +13,17 @@ import ( "github.com/stretchr/testify/assert" ) +type mockDelayExporter struct{} + +func (mockDelayExporter) Export(context.Context, []Record) error { + time.Sleep(time.Millisecond * 5) + return nil +} + +func (mockDelayExporter) Shutdown(context.Context) error { return nil } + +func (mockDelayExporter) ForceFlush(context.Context) error { return nil } + func BenchmarkProcessor(b *testing.B) { for _, tc := range []struct { name string @@ -30,6 +41,12 @@ func BenchmarkProcessor(b *testing.B) { return []LoggerProviderOption{WithProcessor(NewBatchProcessor(noopExporter{}))} }, }, + { + name: "BatchSimulateExport", + f: func() []LoggerProviderOption { + return []LoggerProviderOption{WithProcessor(NewBatchProcessor(mockDelayExporter{}))} + }, + }, { name: "SetTimestampSimple", f: func() []LoggerProviderOption { diff --git a/sdk/log/exporter.go b/sdk/log/exporter.go index e4e3c5402bf..ad9072d99c8 100644 --- a/sdk/log/exporter.go +++ b/sdk/log/exporter.go @@ -189,8 +189,8 @@ type bufferExporter struct { // If size is less than zero, zero will be used (i.e. only synchronous // exporting will be supported). func newBufferExporter(exporter Exporter, size int) *bufferExporter { - if size < 0 { - size = 0 + if size < 1 { + size = 1 } input := make(chan exportData, size) return &bufferExporter{ @@ -201,6 +201,10 @@ func newBufferExporter(exporter Exporter, size int) *bufferExporter { } } +func (e *bufferExporter) IsQueueFull() bool { + return len(e.input) == cap(e.input) +} + var errStopped = errors.New("exporter stopped") func (e *bufferExporter) enqueue(ctx context.Context, records []Record, rCh chan<- error) error { From 93b55ea0761ae146408424515a72d4574ee816c9 Mon Sep 17 00:00:00 2001 From: wmdanor Date: Fri, 28 Mar 2025 02:24:06 +0000 Subject: [PATCH 02/10] update comment --- sdk/log/exporter.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdk/log/exporter.go b/sdk/log/exporter.go index ad9072d99c8..f458a622b41 100644 --- a/sdk/log/exporter.go +++ b/sdk/log/exporter.go @@ -186,8 +186,7 @@ type bufferExporter struct { // newBufferExporter returns a new bufferExporter that wraps exporter. The // returned bufferExporter will buffer at most size number of export requests. -// If size is less than zero, zero will be used (i.e. only synchronous -// exporting will be supported). +// If size is less than 1, 1 will be used func newBufferExporter(exporter Exporter, size int) *bufferExporter { if size < 1 { size = 1 From 15b3f0e4be8ea1ef7d136f4af6c2d5008d174f20 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20Paj=C4=85k?= Date: Fri, 28 Mar 2025 10:18:07 +0100 Subject: [PATCH 03/10] Update exporter.go --- sdk/log/exporter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/log/exporter.go b/sdk/log/exporter.go index f458a622b41..166a773ad92 100644 --- a/sdk/log/exporter.go +++ b/sdk/log/exporter.go @@ -186,7 +186,7 @@ type bufferExporter struct { // newBufferExporter returns a new bufferExporter that wraps exporter. The // returned bufferExporter will buffer at most size number of export requests. -// If size is less than 1, 1 will be used +// If size is less than 1, 1 will be used. func newBufferExporter(exporter Exporter, size int) *bufferExporter { if size < 1 { size = 1 From 996346b10e60cfd80f66806d215e0dc02f51ad30 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20Paj=C4=85k?= Date: Fri, 28 Mar 2025 10:26:42 +0100 Subject: [PATCH 04/10] Update CHANGELOG.md --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 15ffa7a7277..8e8cac6c998 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -36,6 +36,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Initialize map with `len(keys)` in `NewAllowKeysFilter` and `NewDenyKeysFilter` to avoid unnecessary allocations in `go.opentelemetry.io/otel/attribute`. (#6455) - `go.opentelemetry.io/otel/log/logtest` is now a separate Go module. (#6465) - `go.opentelemetry.io/otel/sdk/log/logtest` is now a separate Go module. (#6466) +- `BatchProcessor` in `go.opentelemetry.io/otel/sdk/log` now drops the most recently emitted log records instead of the oldest ones when the queue reaches its capacity. (#6569) ### Deprecated From 2c7d8a6fe37503d0fec3ca04367cbbca608c9e1f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20Paj=C4=85k?= Date: Fri, 28 Mar 2025 11:05:20 +0100 Subject: [PATCH 05/10] Update CHANGELOG.md --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8e8cac6c998..21e9e6d00c8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -36,7 +36,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Initialize map with `len(keys)` in `NewAllowKeysFilter` and `NewDenyKeysFilter` to avoid unnecessary allocations in `go.opentelemetry.io/otel/attribute`. (#6455) - `go.opentelemetry.io/otel/log/logtest` is now a separate Go module. (#6465) - `go.opentelemetry.io/otel/sdk/log/logtest` is now a separate Go module. (#6466) -- `BatchProcessor` in `go.opentelemetry.io/otel/sdk/log` now drops the most recently emitted log records instead of the oldest ones when the queue reaches its capacity. (#6569) +- Improve peroformance of `BatchProcessor` in `go.opentelemetry.io/otel/sdk/log` by not exporting when exporter cannot accept more. (#6569) ### Deprecated From 0f8bc544c3ab7a3fd0a75b96d476a3a995b43ecc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20Paj=C4=85k?= Date: Fri, 28 Mar 2025 11:30:45 +0100 Subject: [PATCH 06/10] Update batch.go --- sdk/log/batch.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/log/batch.go b/sdk/log/batch.go index 33373e4b706..a419bfb1ee3 100644 --- a/sdk/log/batch.go +++ b/sdk/log/batch.go @@ -157,7 +157,7 @@ func (b *BatchProcessor) poll(interval time.Duration) (done chan struct{}) { } qLen := b.q.Len() - // don't copy data from queue unless exporter can accept more, it is very expensive + // Don't copy data from queue unless exporter can accept more, it is very expensive. if !b.exporter.IsQueueFull() { qLen = b.q.TryDequeue(buf, func(r []Record) bool { ok := b.exporter.EnqueueExport(r) From be4cc3d3bf2de8b292f7e6f43ab41ce6c1af312e Mon Sep 17 00:00:00 2001 From: wmdanor Date: Fri, 28 Mar 2025 17:39:53 +0000 Subject: [PATCH 07/10] fix typo --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 21e9e6d00c8..6a767e46cf3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -36,7 +36,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Initialize map with `len(keys)` in `NewAllowKeysFilter` and `NewDenyKeysFilter` to avoid unnecessary allocations in `go.opentelemetry.io/otel/attribute`. (#6455) - `go.opentelemetry.io/otel/log/logtest` is now a separate Go module. (#6465) - `go.opentelemetry.io/otel/sdk/log/logtest` is now a separate Go module. (#6466) -- Improve peroformance of `BatchProcessor` in `go.opentelemetry.io/otel/sdk/log` by not exporting when exporter cannot accept more. (#6569) +- Improve performance of `BatchProcessor` in `go.opentelemetry.io/otel/sdk/log` by not exporting when exporter cannot accept more. (#6569) ### Deprecated From 3a8fb6b0eca70d67734c08d8c6687cd29a8f496a Mon Sep 17 00:00:00 2001 From: wmdanor Date: Fri, 28 Mar 2025 17:47:53 +0000 Subject: [PATCH 08/10] change skip behaviour --- sdk/log/batch.go | 26 ++++++++++---------------- 1 file changed, 10 insertions(+), 16 deletions(-) diff --git a/sdk/log/batch.go b/sdk/log/batch.go index a419bfb1ee3..d8dd059ff0a 100644 --- a/sdk/log/batch.go +++ b/sdk/log/batch.go @@ -156,17 +156,18 @@ func (b *BatchProcessor) poll(interval time.Duration) (done chan struct{}) { global.Warn("dropped log records", "dropped", d) } - qLen := b.q.Len() // Don't copy data from queue unless exporter can accept more, it is very expensive. - if !b.exporter.IsQueueFull() { - qLen = b.q.TryDequeue(buf, func(r []Record) bool { - ok := b.exporter.EnqueueExport(r) - if ok { - buf = slices.Clone(buf) - } - return ok - }) + if b.exporter.IsQueueFull() { + continue } + + qLen := b.q.TryDequeue(buf, func(r []Record) bool { + ok := b.exporter.EnqueueExport(r) + if ok { + buf = slices.Clone(buf) + } + return ok + }) if qLen >= b.batchSize { // There is another full batch ready. Immediately trigger // another export attempt. @@ -276,13 +277,6 @@ func newQueue(size int) *queue { } } -func (q *queue) Len() int { - q.Lock() - defer q.Unlock() - - return q.len -} - // Dropped returns the number of Records dropped during enqueueing since the // last time Dropped was called. func (q *queue) Dropped() uint64 { From 12038f5a73fadf4c5eba99b4d5660acaa9afa42f Mon Sep 17 00:00:00 2001 From: wmdanor Date: Fri, 28 Mar 2025 18:28:03 +0000 Subject: [PATCH 09/10] revert skip behaviour --- sdk/log/batch.go | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/sdk/log/batch.go b/sdk/log/batch.go index d8dd059ff0a..f6d9d6f9d19 100644 --- a/sdk/log/batch.go +++ b/sdk/log/batch.go @@ -156,18 +156,18 @@ func (b *BatchProcessor) poll(interval time.Duration) (done chan struct{}) { global.Warn("dropped log records", "dropped", d) } + qLen := b.q.Len() // Don't copy data from queue unless exporter can accept more, it is very expensive. - if b.exporter.IsQueueFull() { - continue + if !b.exporter.IsQueueFull() { + qLen = b.q.TryDequeue(buf, func(r []Record) bool { + ok := b.exporter.EnqueueExport(r) + if ok { + buf = slices.Clone(buf) + } + return ok + }) } - qLen := b.q.TryDequeue(buf, func(r []Record) bool { - ok := b.exporter.EnqueueExport(r) - if ok { - buf = slices.Clone(buf) - } - return ok - }) if qLen >= b.batchSize { // There is another full batch ready. Immediately trigger // another export attempt. @@ -277,6 +277,13 @@ func newQueue(size int) *queue { } } +func (q *queue) Len() int { + q.Lock() + defer q.Unlock() + + return q.len +} + // Dropped returns the number of Records dropped during enqueueing since the // last time Dropped was called. func (q *queue) Dropped() uint64 { From 668e54f6f588ac33cf5f877d5a643623fc980a3a Mon Sep 17 00:00:00 2001 From: wmdanor Date: Tue, 8 Apr 2025 18:52:59 +0100 Subject: [PATCH 10/10] replace isqueuefull with ready --- sdk/log/batch.go | 2 +- sdk/log/exporter.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sdk/log/batch.go b/sdk/log/batch.go index f6d9d6f9d19..dd4c7a18c98 100644 --- a/sdk/log/batch.go +++ b/sdk/log/batch.go @@ -158,7 +158,7 @@ func (b *BatchProcessor) poll(interval time.Duration) (done chan struct{}) { qLen := b.q.Len() // Don't copy data from queue unless exporter can accept more, it is very expensive. - if !b.exporter.IsQueueFull() { + if b.exporter.Ready() { qLen = b.q.TryDequeue(buf, func(r []Record) bool { ok := b.exporter.EnqueueExport(r) if ok { diff --git a/sdk/log/exporter.go b/sdk/log/exporter.go index 166a773ad92..8cef5dde6b5 100644 --- a/sdk/log/exporter.go +++ b/sdk/log/exporter.go @@ -200,8 +200,8 @@ func newBufferExporter(exporter Exporter, size int) *bufferExporter { } } -func (e *bufferExporter) IsQueueFull() bool { - return len(e.input) == cap(e.input) +func (e *bufferExporter) Ready() bool { + return len(e.input) != cap(e.input) } var errStopped = errors.New("exporter stopped")