Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- The `Event` and `Link` struct types from the `go.opentelemetry.io/otel` package now include a `DroppedAttributeCount` field to record the number of attributes that were not recorded due to configured limits being reached. (#1771)
- The Jaeger exporter now reports dropped attributes for a Span event in the exported log. (#1771)
- Adds `k8s.node.name` and `k8s.node.uid` attribute keys to the `semconv` package. (#1789)
- Adds `otlpgrpc.WithTimeout` option for configuring timeout to the otlp/gRPC exporter. (#1821)

### Fixed

Expand Down
4 changes: 4 additions & 0 deletions exporters/otlp/otlpgrpc/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ func (d *driver) ExportMetrics(ctx context.Context, cps metricsdk.CheckpointSet,
}
ctx, cancel := d.metricsDriver.connection.contextWithStop(ctx)
defer cancel()
ctx, tCancel := context.WithTimeout(ctx, d.metricsDriver.connection.sCfg.Timeout)
defer tCancel()

rms, err := transform.CheckpointSet(ctx, selector, cps, 1)
if err != nil {
Expand Down Expand Up @@ -162,6 +164,8 @@ func (d *driver) ExportTraces(ctx context.Context, ss []*tracesdk.SpanSnapshot)
}
ctx, cancel := d.tracesDriver.connection.contextWithStop(ctx)
defer cancel()
ctx, tCancel := context.WithTimeout(ctx, d.tracesDriver.connection.sCfg.Timeout)
defer tCancel()

protoSpans := transform.SpanData(ss)
if len(protoSpans) == 0 {
Expand Down
8 changes: 8 additions & 0 deletions exporters/otlp/otlpgrpc/mock_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type mockTraceService struct {
mu sync.RWMutex
storage otlptest.SpansStorage
headers metadata.MD
delay time.Duration
}

func (mts *mockTraceService) getHeaders() metadata.MD {
Expand All @@ -73,6 +74,9 @@ func (mts *mockTraceService) getResourceSpans() []*tracepb.ResourceSpans {
}

func (mts *mockTraceService) Export(ctx context.Context, exp *collectortracepb.ExportTraceServiceRequest) (*collectortracepb.ExportTraceServiceResponse, error) {
if mts.delay > 0 {
time.Sleep(mts.delay)
}
reply := &collectortracepb.ExportTraceServiceResponse{}
mts.mu.Lock()
defer mts.mu.Unlock()
Expand All @@ -86,6 +90,7 @@ type mockMetricService struct {

mu sync.RWMutex
storage otlptest.MetricsStorage
delay time.Duration
}

func (mms *mockMetricService) getMetrics() []*metricpb.Metric {
Expand All @@ -95,6 +100,9 @@ func (mms *mockMetricService) getMetrics() []*metricpb.Metric {
}

func (mms *mockMetricService) Export(ctx context.Context, exp *collectormetricpb.ExportMetricsServiceRequest) (*collectormetricpb.ExportMetricsServiceResponse, error) {
if mms.delay > 0 {
time.Sleep(mms.delay)
}
reply := &collectormetricpb.ExportMetricsServiceResponse{}
mms.mu.Lock()
defer mms.mu.Unlock()
Expand Down
18 changes: 18 additions & 0 deletions exporters/otlp/otlpgrpc/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,3 +182,21 @@ func WithDialOption(opts ...grpc.DialOption) Option {
cfg.DialOptions = opts
})
}

// WithTimeout tells the driver the max waiting time for the backend to process
// each spans or metrics batch. If unset, the default will be 10 seconds.
func WithTimeout(duration time.Duration) Option {
return otlpconfig.WithTimeout(duration)
}

// WithTracesTimeout tells the driver the max waiting time for the backend to process
// each spans batch. If unset, the default will be 10 seconds.
func WithTracesTimeout(duration time.Duration) Option {
return otlpconfig.WithTracesTimeout(duration)
}

// WithMetricsTimeout tells the driver the max waiting time for the backend to process
// each metrics batch. If unset, the default will be 10 seconds.
func WithMetricsTimeout(duration time.Duration) Option {
return otlpconfig.WithMetricsTimeout(duration)
}
87 changes: 87 additions & 0 deletions exporters/otlp/otlpgrpc/otlp_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ import (
"testing"
"time"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -302,6 +305,90 @@ func TestNewExporter_withHeaders(t *testing.T) {
assert.Equal(t, "value1", headers.Get("header1")[0])
}

func TestNewExporter_WithTimeout(t *testing.T) {
tts := []struct {
name string
fn func(exp *otlp.Exporter) error
timeout time.Duration
metrics int
spans int
code codes.Code
delay bool
}{
{
name: "Timeout Spans",
fn: func(exp *otlp.Exporter) error {
return exp.ExportSpans(context.Background(), []*sdktrace.SpanSnapshot{{Name: "timed out"}})
},
timeout: time.Millisecond * 100,
code: codes.DeadlineExceeded,
delay: true,
},
{
name: "Timeout Metrics",
fn: func(exp *otlp.Exporter) error {
return exp.Export(context.Background(), otlptest.OneRecordCheckpointSet{})
},
timeout: time.Millisecond * 100,
code: codes.DeadlineExceeded,
delay: true,
},

{
name: "No Timeout Spans",
fn: func(exp *otlp.Exporter) error {
return exp.ExportSpans(context.Background(), []*sdktrace.SpanSnapshot{{Name: "timed out"}})
},
timeout: time.Minute,
spans: 1,
code: codes.OK,
},
{
name: "No Timeout Metrics",
fn: func(exp *otlp.Exporter) error {
return exp.Export(context.Background(), otlptest.OneRecordCheckpointSet{})
},
timeout: time.Minute,
metrics: 1,
code: codes.OK,
},
}

for _, tt := range tts {
t.Run(tt.name, func(t *testing.T) {

mc := runMockCollector(t)
if tt.delay {
mc.traceSvc.delay = time.Second * 10
mc.metricSvc.delay = time.Second * 10
}
defer func() {
_ = mc.stop()
}()

ctx := context.Background()
exp := newGRPCExporter(t, ctx, mc.endpoint, otlpgrpc.WithTimeout(tt.timeout))
defer func() {
_ = exp.Shutdown(ctx)
}()

err := tt.fn(exp)

if tt.code == codes.OK {
require.NoError(t, err)
} else {
require.Error(t, err)
}

s := status.Convert(err)
require.Equal(t, tt.code, s.Code())

require.Len(t, mc.getSpans(), tt.spans)
require.Len(t, mc.getMetrics(), tt.metrics)
})
}
}

func TestNewExporter_withInvalidSecurityConfiguration(t *testing.T) {
mc := runMockCollector(t)
defer func() {
Expand Down