Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

## [Unreleased]

### Changed

- `ForceFlush` of the `NewBatchSpanProcessor` in `go.opentelemetry.io/otel/sdk/trace` calls `SpanExporter.ExportSpans` synchronously. (#6416)

Comment on lines +11 to +14
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for the record, this is a significant change in behavior, and can easily be interpreted as a breaking change.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More like this https://xkcd.com/1172/

### Removed

- Drop support for [Go 1.22]. (#6381, #6418)
Expand Down
34 changes: 12 additions & 22 deletions sdk/trace/batch_span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,33 +186,23 @@ func (bsp *batchSpanProcessor) ForceFlush(ctx context.Context) error {
return nil
}

var err error
if bsp.e != nil {
flushCh := make(chan struct{})
if bsp.enqueueBlockOnQueueFull(ctx, forceFlushSpan{flushed: flushCh}) {
select {
case <-bsp.stopCh:
// The batchSpanProcessor is Shutdown.
return nil
case <-flushCh:
// Processed any items in queue prior to ForceFlush being called
case <-ctx.Done():
return ctx.Err()
}
}
if bsp.e == nil {
return nil
}

wait := make(chan error, 1)
go func() {
wait <- bsp.exportSpans(ctx)
}()
// Wait until the export is finished or the context is cancelled/timed out
flushCh := make(chan struct{})
if bsp.enqueueBlockOnQueueFull(ctx, forceFlushSpan{flushed: flushCh}) {
select {
case err = <-wait:
case <-bsp.stopCh:
// The batchSpanProcessor is Shutdown.
return nil
case <-flushCh:
// Processed any items in queue prior to ForceFlush being called
case <-ctx.Done():
err = ctx.Err()
return ctx.Err()
}
}
return err
return bsp.exportSpans(ctx)
}

// WithMaxQueueSize returns a BatchSpanProcessorOption that configures the
Expand Down
33 changes: 2 additions & 31 deletions sdk/trace/batch_span_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,24 +524,13 @@ func assertMaxSpanDiff(t *testing.T, want, got, maxDif int) {
}
}

type indefiniteExporter struct {
stop chan (struct{})
}

func newIndefiniteExporter(t *testing.T) indefiniteExporter {
e := indefiniteExporter{stop: make(chan struct{})}
t.Cleanup(func() {
go close(e.stop)
})
return e
}
type indefiniteExporter struct{}

func (e indefiniteExporter) Shutdown(context.Context) error {
return nil
}

func (e indefiniteExporter) ExportSpans(ctx context.Context, _ []ReadOnlySpan) error {
<-e.stop
return ctx.Err()
}

Expand All @@ -550,7 +539,7 @@ func TestBatchSpanProcessorForceFlushCancellation(t *testing.T) {
// Cancel the context
cancel()

bsp := NewBatchSpanProcessor(newIndefiniteExporter(t))
bsp := NewBatchSpanProcessor(indefiniteExporter{})
t.Cleanup(func() {
assert.NoError(t, bsp.Shutdown(context.Background()))
})
Expand All @@ -560,24 +549,6 @@ func TestBatchSpanProcessorForceFlushCancellation(t *testing.T) {
}
}

func TestBatchSpanProcessorForceFlushTimeout(t *testing.T) {
tp := basicTracerProvider(t)
exp := newIndefiniteExporter(t)
bsp := NewBatchSpanProcessor(exp)
tp.RegisterSpanProcessor(bsp)
tr := tp.Tracer(t.Name())
_, span := tr.Start(context.Background(), "foo")
span.End()

// Add timeout to context to test deadline
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()

if got, want := bsp.ForceFlush(ctx), context.DeadlineExceeded; !errors.Is(got, want) {
t.Errorf("expected %q error, got %v", want, got)
}
}
Comment on lines -563 to -579
Copy link
Copy Markdown
Member Author

@pellared pellared Mar 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test was verifying a scenario when SpanExporter.ExportSpans hangs. Now we would be hanging because we are calling synchronously. I think this is fine as returning prematurely leaks a goroutine.


func TestBatchSpanProcessorForceFlushQueuedSpans(t *testing.T) {
ctx := context.Background()

Expand Down
Loading