From 919fb4c438475217d89a6dbba80a3529f016d160 Mon Sep 17 00:00:00 2001 From: Robert Pajak Date: Thu, 6 Mar 2025 08:41:26 +0100 Subject: [PATCH 1/2] sdk/trace: batchSpanProcessor.ForceFlush calls SpanExporter.ExportSpans synchronously --- CHANGELOG.md | 4 +++ sdk/trace/batch_span_processor.go | 34 +++++++++----------------- sdk/trace/batch_span_processor_test.go | 33 ++----------------------- 3 files changed, 18 insertions(+), 53 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f01d9796a5b..d88e99ed8be 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ## [Unreleased] +### Changed + +- `ForceFlush` of the `NewBatchSpanProcessor` in `go.opentelemetry.io/otel/sdk/trace` calls `SpanExporter.ExportSpans` synchronously. (#TODO) + ### Removed - Drop support for [Go 1.22]. (#6381) diff --git a/sdk/trace/batch_span_processor.go b/sdk/trace/batch_span_processor.go index 6872cbb4e7a..7d832fb543d 100644 --- a/sdk/trace/batch_span_processor.go +++ b/sdk/trace/batch_span_processor.go @@ -186,33 +186,23 @@ func (bsp *batchSpanProcessor) ForceFlush(ctx context.Context) error { return nil } - var err error - if bsp.e != nil { - flushCh := make(chan struct{}) - if bsp.enqueueBlockOnQueueFull(ctx, forceFlushSpan{flushed: flushCh}) { - select { - case <-bsp.stopCh: - // The batchSpanProcessor is Shutdown. - return nil - case <-flushCh: - // Processed any items in queue prior to ForceFlush being called - case <-ctx.Done(): - return ctx.Err() - } - } + if bsp.e == nil { + return nil + } - wait := make(chan error, 1) - go func() { - wait <- bsp.exportSpans(ctx) - }() - // Wait until the export is finished or the context is cancelled/timed out + flushCh := make(chan struct{}) + if bsp.enqueueBlockOnQueueFull(ctx, forceFlushSpan{flushed: flushCh}) { select { - case err = <-wait: + case <-bsp.stopCh: + // The batchSpanProcessor is Shutdown. + return nil + case <-flushCh: + // Processed any items in queue prior to ForceFlush being called case <-ctx.Done(): - err = ctx.Err() + return ctx.Err() } } - return err + return bsp.exportSpans(ctx) } // WithMaxQueueSize returns a BatchSpanProcessorOption that configures the diff --git a/sdk/trace/batch_span_processor_test.go b/sdk/trace/batch_span_processor_test.go index 82cd1db0cb1..f430d93d323 100644 --- a/sdk/trace/batch_span_processor_test.go +++ b/sdk/trace/batch_span_processor_test.go @@ -524,24 +524,13 @@ func assertMaxSpanDiff(t *testing.T, want, got, maxDif int) { } } -type indefiniteExporter struct { - stop chan (struct{}) -} - -func newIndefiniteExporter(t *testing.T) indefiniteExporter { - e := indefiniteExporter{stop: make(chan struct{})} - t.Cleanup(func() { - go close(e.stop) - }) - return e -} +type indefiniteExporter struct{} func (e indefiniteExporter) Shutdown(context.Context) error { return nil } func (e indefiniteExporter) ExportSpans(ctx context.Context, _ []ReadOnlySpan) error { - <-e.stop return ctx.Err() } @@ -550,7 +539,7 @@ func TestBatchSpanProcessorForceFlushCancellation(t *testing.T) { // Cancel the context cancel() - bsp := NewBatchSpanProcessor(newIndefiniteExporter(t)) + bsp := NewBatchSpanProcessor(indefiniteExporter{}) t.Cleanup(func() { assert.NoError(t, bsp.Shutdown(context.Background())) }) @@ -560,24 +549,6 @@ func TestBatchSpanProcessorForceFlushCancellation(t *testing.T) { } } -func TestBatchSpanProcessorForceFlushTimeout(t *testing.T) { - tp := basicTracerProvider(t) - exp := newIndefiniteExporter(t) - bsp := NewBatchSpanProcessor(exp) - tp.RegisterSpanProcessor(bsp) - tr := tp.Tracer(t.Name()) - _, span := tr.Start(context.Background(), "foo") - span.End() - - // Add timeout to context to test deadline - ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) - defer cancel() - - if got, want := bsp.ForceFlush(ctx), context.DeadlineExceeded; !errors.Is(got, want) { - t.Errorf("expected %q error, got %v", want, got) - } -} - func TestBatchSpanProcessorForceFlushQueuedSpans(t *testing.T) { ctx := context.Background() From 734e5882a61475092a09c9981e28efe6fc82abb7 Mon Sep 17 00:00:00 2001 From: Robert Pajak Date: Thu, 6 Mar 2025 08:42:01 +0100 Subject: [PATCH 2/2] Update changelog --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d88e99ed8be..0d27d5ba278 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,7 +10,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Changed -- `ForceFlush` of the `NewBatchSpanProcessor` in `go.opentelemetry.io/otel/sdk/trace` calls `SpanExporter.ExportSpans` synchronously. (#TODO) +- `ForceFlush` of the `NewBatchSpanProcessor` in `go.opentelemetry.io/otel/sdk/trace` calls `SpanExporter.ExportSpans` synchronously. (#6416) ### Removed