diff --git a/exporters/trace/jaeger/jaeger.go b/exporters/trace/jaeger/jaeger.go index f5c4112e7ac..2989bb65b31 100644 --- a/exporters/trace/jaeger/jaeger.go +++ b/exporters/trace/jaeger/jaeger.go @@ -19,7 +19,6 @@ import ( "encoding/binary" "encoding/json" "fmt" - "sync" "google.golang.org/api/support/bundler" @@ -115,8 +114,10 @@ func NewRawExporter(endpointOption EndpointOption, opts ...Option) (*Exporter, e return nil, fmt.Errorf("failed to get service name from default resource") } + stopCh := make(chan struct{}) e := &Exporter{ uploader: uploader, + stopCh: stopCh, defaultServiceName: defaultServiceName, } bundler := bundler.NewBundler((*sdktrace.SpanSnapshot)(nil), func(bundle interface{}) { @@ -180,8 +181,7 @@ type Exporter struct { bundler *bundler.Bundler uploader batchUploader - stoppedMu sync.RWMutex - stopped bool + stopCh chan struct{} defaultServiceName string } @@ -190,13 +190,27 @@ var _ sdktrace.SpanExporter = (*Exporter)(nil) // ExportSpans exports SpanSnapshots to Jaeger. func (e *Exporter) ExportSpans(ctx context.Context, ss []*sdktrace.SpanSnapshot) error { - e.stoppedMu.RLock() - stopped := e.stopped - e.stoppedMu.RUnlock() - if stopped { + // Return fast if context is already canceled or Exporter shutdown. + select { + case <-ctx.Done(): + return ctx.Err() + case <-e.stopCh: return nil + default: } + // Cancel export if Exporter is shutdown. + var cancel context.CancelFunc + ctx, cancel = context.WithCancel(ctx) + defer cancel() + go func(ctx context.Context, cancel context.CancelFunc) { + select { + case <-ctx.Done(): + case <-e.stopCh: + cancel() + } + }(ctx, cancel) + for _, span := range ss { // TODO(jbd): Handle oversized bundlers. err := e.bundler.AddWait(ctx, span, 1) @@ -220,9 +234,8 @@ var flush = func(e *Exporter) { // Shutdown stops the exporter flushing any pending exports. func (e *Exporter) Shutdown(ctx context.Context) error { - e.stoppedMu.Lock() - e.stopped = true - e.stoppedMu.Unlock() + // Stop any active and subsequent exports. + close(e.stopCh) done := make(chan struct{}, 1) // Shadow so if the goroutine is leaked in testing it doesn't cause a race @@ -408,6 +421,12 @@ func getBoolTag(k string, b bool) *gen.Tag { // // This is useful if your program is ending and you do not want to lose recent spans. func (e *Exporter) Flush() { + // Return fast if Exporter shutdown. + select { + case <-e.stopCh: + return + default: + } flush(e) }