Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
450 changes: 2 additions & 448 deletions example/jaeger/go.sum

Large diffs are not rendered by default.

29 changes: 10 additions & 19 deletions example/jaeger/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,31 +29,22 @@ import (
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)

// initTracer creates a new trace provider instance and registers it as global trace provider.
func initTracer() func() {
func main() {
ctx := context.Background()

// Create and install Jaeger export pipeline.
flush, err := jaeger.InstallNewPipeline(
err := jaeger.InstallNewPipeline(
jaeger.WithCollectorEndpoint("http://localhost:14268/api/traces"),
jaeger.WithSDKOptions(
sdktrace.WithSampler(sdktrace.AlwaysSample()),
sdktrace.WithResource(resource.NewWithAttributes(
semconv.ServiceNameKey.String("trace-demo"),
attribute.String("exporter", "jaeger"),
attribute.Float64("float", 312.23),
)),
),
sdktrace.WithSampler(sdktrace.AlwaysSample()),
sdktrace.WithResource(resource.NewWithAttributes(
semconv.ServiceNameKey.String("trace-demo"),
attribute.String("exporter", "jaeger"),
attribute.Float64("float", 312.23),
)),
)
if err != nil {
log.Fatal(err)
}
return flush
}

func main() {
ctx := context.Background()

flush := initTracer()
defer flush()

tr := otel.Tracer("component-main")
ctx, span := tr.Start(ctx, "foo")
Expand Down
4 changes: 2 additions & 2 deletions exporters/trace/jaeger/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,9 @@ func newAgentClientUDP(params agentClientUDPParams) (*agentClientUDP, error) {
}

// EmitBatch implements EmitBatch() of Agent interface
func (a *agentClientUDP) EmitBatch(batch *gen.Batch) error {
func (a *agentClientUDP) EmitBatch(ctx context.Context, batch *gen.Batch) error {
a.thriftBuffer.Reset()
if err := a.client.EmitBatch(context.Background(), batch); err != nil {
if err := a.client.EmitBatch(ctx, batch); err != nil {
return err
}
if a.thriftBuffer.Len() > a.maxPacketSize {
Expand Down
1 change: 0 additions & 1 deletion exporters/trace/jaeger/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ require (
go.opentelemetry.io/otel v0.19.0
go.opentelemetry.io/otel/sdk v0.19.0
go.opentelemetry.io/otel/trace v0.19.0
google.golang.org/api v0.44.0
)

replace go.opentelemetry.io/otel/bridge/opencensus => ../../../bridge/opencensus
Expand Down
450 changes: 2 additions & 448 deletions exporters/trace/jaeger/go.sum

Large diffs are not rendered by default.

206 changes: 40 additions & 166 deletions exporters/trace/jaeger/jaeger.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ import (
"encoding/binary"
"encoding/json"
"fmt"
"sync"

"google.golang.org/api/support/bundler"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
Expand All @@ -44,67 +41,16 @@ const (
keyEventName = "event"
)

type Option func(*options)

// options are the options to be used when initializing a Jaeger export.
type options struct {

// BufferMaxCount defines the total number of traces that can be buffered in memory
BufferMaxCount int

// BatchMaxCount defines the maximum number of spans sent in one batch
BatchMaxCount int

// TracerProviderOptions defines the options for tracer provider of sdk.
TracerProviderOptions []sdktrace.TracerProviderOption

Disabled bool
}

// WithBufferMaxCount defines the total number of traces that can be buffered in memory
func WithBufferMaxCount(bufferMaxCount int) Option {
return func(o *options) {
o.BufferMaxCount = bufferMaxCount
}
}

// WithBatchMaxCount defines the maximum number of spans in one batch
func WithBatchMaxCount(batchMaxCount int) Option {
return func(o *options) {
o.BatchMaxCount = batchMaxCount
}
}

// WithSDKOptions configures options for tracer provider of sdk.
func WithSDKOptions(opts ...sdktrace.TracerProviderOption) Option {
return func(o *options) {
o.TracerProviderOptions = opts
}
}

// WithDisabled option will cause pipeline methods to use
// a no-op provider
func WithDisabled(disabled bool) Option {
return func(o *options) {
o.Disabled = disabled
}
}

// NewRawExporter returns an OTel Exporter implementation that exports the
// collected spans to Jaeger.
//
// It will IGNORE Disabled option.
func NewRawExporter(endpointOption EndpointOption, opts ...Option) (*Exporter, error) {
func NewRawExporter(endpointOption EndpointOption) (*Exporter, error) {
uploader, err := endpointOption()
if err != nil {
return nil, err
}

o := options{}
for _, opt := range opts {
opt(&o)
}

// Fetch default service.name from default resource for backup
var defaultServiceName string
defaultResource := resource.Default()
Expand All @@ -115,145 +61,92 @@ func NewRawExporter(endpointOption EndpointOption, opts ...Option) (*Exporter, e
return nil, fmt.Errorf("failed to get service name from default resource")
}

stopCh := make(chan struct{})
e := &Exporter{
uploader: uploader,
o: o,
stopCh: stopCh,
defaultServiceName: defaultServiceName,
}
bundler := bundler.NewBundler((*sdktrace.SpanSnapshot)(nil), func(bundle interface{}) {
if err := e.upload(bundle.([]*sdktrace.SpanSnapshot)); err != nil {
otel.Handle(err)
}
})

// Set BufferedByteLimit with the total number of spans that are permissible to be held in memory.
// This needs to be done since the size of messages is always set to 1. Failing to set this would allow
// 1G messages to be held in memory since that is the default value of BufferedByteLimit.
if o.BufferMaxCount != 0 {
bundler.BufferedByteLimit = o.BufferMaxCount
}

// The default value bundler uses is 10, increase to send larger batches
if o.BatchMaxCount != 0 {
bundler.BundleCountThreshold = o.BatchMaxCount
}

e.bundler = bundler
return e, nil
}

// NewExportPipeline sets up a complete export pipeline
// with the recommended setup for trace provider
func NewExportPipeline(endpointOption EndpointOption, opts ...Option) (trace.TracerProvider, func(), error) {
o := options{}
for _, opt := range opts {
opt(&o)
}
if o.Disabled {
return trace.NewNoopTracerProvider(), func() {}, nil
}

exporter, err := NewRawExporter(endpointOption, opts...)
func NewExportPipeline(endpointOption EndpointOption, opts ...sdktrace.TracerProviderOption) (*sdktrace.TracerProvider, error) {
exporter, err := NewRawExporter(endpointOption)
if err != nil {
return nil, nil, err
return nil, err
}

pOpts := append(exporter.o.TracerProviderOptions, sdktrace.WithSyncer(exporter))
tp := sdktrace.NewTracerProvider(pOpts...)
return tp, exporter.Flush, nil
tp := sdktrace.NewTracerProvider(append(opts, sdktrace.WithBatcher(exporter))...)
return tp, nil
}

// InstallNewPipeline instantiates a NewExportPipeline with the
// recommended configuration and registers it globally.
func InstallNewPipeline(endpointOption EndpointOption, opts ...Option) (func(), error) {
tp, flushFn, err := NewExportPipeline(endpointOption, opts...)
func InstallNewPipeline(endpointOption EndpointOption, opts ...sdktrace.TracerProviderOption) error {
tp, err := NewExportPipeline(endpointOption, opts...)
if err != nil {
return nil, err
return err
}

otel.SetTracerProvider(tp)
return flushFn, nil
}

// Process contains the information exported to jaeger about the source
// of the trace data.
type Process struct {
// ServiceName is the Jaeger service name.
ServiceName string

// Tags are added to Jaeger Process exports
Tags []attribute.KeyValue
return nil
}

// Exporter is an implementation of an OTel SpanSyncer that uploads spans to
// Jaeger.
// Exporter exports OpenTelemetry spans to a Jaeger agent or collector.
type Exporter struct {
bundler *bundler.Bundler
uploader batchUploader
o options

stoppedMu sync.RWMutex
stopped bool
stopCh chan struct{}

defaultServiceName string
}

var _ sdktrace.SpanExporter = (*Exporter)(nil)

// ExportSpans exports SpanSnapshots to Jaeger.
func (e *Exporter) ExportSpans(ctx context.Context, ss []*sdktrace.SpanSnapshot) error {
e.stoppedMu.RLock()
stopped := e.stopped
e.stoppedMu.RUnlock()
if stopped {
// ExportSpans transforms and exports OpenTelemetry spans to Jaeger.
func (e *Exporter) ExportSpans(ctx context.Context, spans []*sdktrace.SpanSnapshot) error {
// Return fast if context is already canceled or Exporter shutdown.
select {
case <-ctx.Done():
return ctx.Err()
case <-e.stopCh:
return nil
default:
}

for _, span := range ss {
// TODO(jbd): Handle oversized bundlers.
err := e.bundler.AddWait(ctx, span, 1)
if err != nil {
return fmt.Errorf("failed to bundle %q: %w", span.Name, err)
}
// Cancel export if Exporter is shutdown.
var cancel context.CancelFunc
ctx, cancel = context.WithCancel(ctx)
defer cancel()
go func(ctx context.Context, cancel context.CancelFunc) {
select {
case <-ctx.Done():
return ctx.Err()
default:
case <-e.stopCh:
cancel()
}
}(ctx, cancel)

for _, batch := range jaegerBatchList(spans, e.defaultServiceName) {
if err := e.uploader.upload(ctx, batch); err != nil {
return err
}
}

return nil
}

// flush is used to wrap the bundler's Flush method for testing.
var flush = func(e *Exporter) {
e.bundler.Flush()
}

// Shutdown stops the exporter flushing any pending exports.
// Shutdown stops the Exporter. This will close all connections and release
// all resources held by the Exporter.
func (e *Exporter) Shutdown(ctx context.Context) error {
e.stoppedMu.Lock()
e.stopped = true
e.stoppedMu.Unlock()

done := make(chan struct{}, 1)
// Shadow so if the goroutine is leaked in testing it doesn't cause a race
// condition when the file level var is reset.
go func(FlushFunc func(*Exporter)) {
// The OpenTelemetry specification is explicit in not having this
// method block so the preference here is to orphan this goroutine if
// the context is canceled or times out while this flushing is
// occurring. This is a consequence of the bundler Flush method not
// supporting a context.
FlushFunc(e)
done <- struct{}{}
}(flush)
close(e.stopCh)
select {
case <-ctx.Done():
return ctx.Err()
case <-done:
default:
}
return nil
return e.uploader.shutdown(ctx)
}

func spanSnapshotToThrift(ss *sdktrace.SpanSnapshot) *gen.Span {
Expand Down Expand Up @@ -416,25 +309,6 @@ func getBoolTag(k string, b bool) *gen.Tag {
}
}

// Flush waits for exported trace spans to be uploaded.
//
// This is useful if your program is ending and you do not want to lose recent spans.
func (e *Exporter) Flush() {
flush(e)
}

func (e *Exporter) upload(spans []*sdktrace.SpanSnapshot) error {
batchList := jaegerBatchList(spans, e.defaultServiceName)
for _, batch := range batchList {
err := e.uploader.upload(batch)
if err != nil {
return err
}
}

return nil
}

// jaegerBatchList transforms a slice of SpanSnapshot into a slice of jaeger
// Batch.
func jaegerBatchList(ssl []*sdktrace.SpanSnapshot, defaultServiceName string) []*gen.Batch {
Expand Down
Loading