diff --git a/CHANGELOG.md b/CHANGELOG.md index edc38211738..c2f7ff60d22 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ## [Unreleased] +### Changed + +- `ForceFlush` of the `NewBatchSpanProcessor` in `go.opentelemetry.io/otel/sdk/trace` calls `SpanExporter.ExportSpans` synchronously. (#6416) + ### Removed - Drop support for [Go 1.22]. (#6381, #6418) diff --git a/sdk/trace/batch_span_processor.go b/sdk/trace/batch_span_processor.go index 6872cbb4e7a..7d832fb543d 100644 --- a/sdk/trace/batch_span_processor.go +++ b/sdk/trace/batch_span_processor.go @@ -186,33 +186,23 @@ func (bsp *batchSpanProcessor) ForceFlush(ctx context.Context) error { return nil } - var err error - if bsp.e != nil { - flushCh := make(chan struct{}) - if bsp.enqueueBlockOnQueueFull(ctx, forceFlushSpan{flushed: flushCh}) { - select { - case <-bsp.stopCh: - // The batchSpanProcessor is Shutdown. - return nil - case <-flushCh: - // Processed any items in queue prior to ForceFlush being called - case <-ctx.Done(): - return ctx.Err() - } - } + if bsp.e == nil { + return nil + } - wait := make(chan error, 1) - go func() { - wait <- bsp.exportSpans(ctx) - }() - // Wait until the export is finished or the context is cancelled/timed out + flushCh := make(chan struct{}) + if bsp.enqueueBlockOnQueueFull(ctx, forceFlushSpan{flushed: flushCh}) { select { - case err = <-wait: + case <-bsp.stopCh: + // The batchSpanProcessor is Shutdown. + return nil + case <-flushCh: + // Processed any items in queue prior to ForceFlush being called case <-ctx.Done(): - err = ctx.Err() + return ctx.Err() } } - return err + return bsp.exportSpans(ctx) } // WithMaxQueueSize returns a BatchSpanProcessorOption that configures the diff --git a/sdk/trace/batch_span_processor_test.go b/sdk/trace/batch_span_processor_test.go index 82cd1db0cb1..f430d93d323 100644 --- a/sdk/trace/batch_span_processor_test.go +++ b/sdk/trace/batch_span_processor_test.go @@ -524,24 +524,13 @@ func assertMaxSpanDiff(t *testing.T, want, got, maxDif int) { } } -type indefiniteExporter struct { - stop chan (struct{}) -} - -func newIndefiniteExporter(t *testing.T) indefiniteExporter { - e := indefiniteExporter{stop: make(chan struct{})} - t.Cleanup(func() { - go close(e.stop) - }) - return e -} +type indefiniteExporter struct{} func (e indefiniteExporter) Shutdown(context.Context) error { return nil } func (e indefiniteExporter) ExportSpans(ctx context.Context, _ []ReadOnlySpan) error { - <-e.stop return ctx.Err() } @@ -550,7 +539,7 @@ func TestBatchSpanProcessorForceFlushCancellation(t *testing.T) { // Cancel the context cancel() - bsp := NewBatchSpanProcessor(newIndefiniteExporter(t)) + bsp := NewBatchSpanProcessor(indefiniteExporter{}) t.Cleanup(func() { assert.NoError(t, bsp.Shutdown(context.Background())) }) @@ -560,24 +549,6 @@ func TestBatchSpanProcessorForceFlushCancellation(t *testing.T) { } } -func TestBatchSpanProcessorForceFlushTimeout(t *testing.T) { - tp := basicTracerProvider(t) - exp := newIndefiniteExporter(t) - bsp := NewBatchSpanProcessor(exp) - tp.RegisterSpanProcessor(bsp) - tr := tp.Tracer(t.Name()) - _, span := tr.Start(context.Background(), "foo") - span.End() - - // Add timeout to context to test deadline - ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) - defer cancel() - - if got, want := bsp.ForceFlush(ctx), context.DeadlineExceeded; !errors.Is(got, want) { - t.Errorf("expected %q error, got %v", want, got) - } -} - func TestBatchSpanProcessorForceFlushQueuedSpans(t *testing.T) { ctx := context.Background()