Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,15 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

- Added `WithOSDescription` resource configuration option to set OS (Operating System) description resource attribute (`os.description`). (#1840)
- Added `WithOS` resource configuration option to set all OS (Operating System) resource attributes at once. (#1840)
- Added the `WithRetry` option to the `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp` package.
This option is a replacement for the removed `WithMaxAttempts` and `WithBackoff` options. (#2095)
- Added API `LinkFromContext` to return Link which encapsulates SpanContext from provided context and also encapsulates attributes. (#2115)

### Changed

- The `SpanModels` function is now exported from the `go.opentelemetry.io/otel/exporters/zipkin` package to convert OpenTelemetry spans into Zipkin model spans. (#2027)
- Rename the `"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc".RetrySettings` to `RetryConfig`. (#2095)
- Rename the `"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp".RetrySettings` to `RetryConfig`. (#2095)

### Deprecated

Expand All @@ -28,6 +32,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Removed the deprecated package `go.opentelemetry.io/otel/exporters/trace/zipkin`. (#2020)
- Removed the `"go.opentelemetry.io/otel/sdk/resource".WithBuiltinDetectors` function.
The explicit `With*` options for every built-in detector should be used instead. (#2026 #2097)
- Removed the `WithMaxAttempts` and `WithBackoff` options from the `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp` package.
The retry logic of the package has been updated to match the `otlptracegrpc` package and accordingly a `WithRetry` option is added that should be used instead. (#2095)
- Removed metrics test package `go.opentelemetry.io/otel/sdk/export/metric/metrictest`. (#2105)

### Fixed
Expand Down
145 changes: 24 additions & 121 deletions exporters/otlp/otlptrace/internal/connection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,20 @@ package connection

import (
"context"
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"
"unsafe"

"github.com/cenkalti/backoff/v4"
"google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"google.golang.org/grpc/encoding/gzip"

"go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/otlpconfig"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/retry"

"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
Expand All @@ -48,6 +47,7 @@ type Connection struct {
// these fields are read-only after constructor is finished
cfg otlpconfig.Config
SCfg otlpconfig.SignalConfig
requestFunc retry.RequestFunc
metadata metadata.MD
newConnectionHandler func(cc *grpc.ClientConn)

Expand All @@ -66,6 +66,7 @@ func NewConnection(cfg otlpconfig.Config, sCfg otlpconfig.SignalConfig, handler
c := new(Connection)
c.newConnectionHandler = handler
c.cfg = cfg
c.requestFunc = cfg.RetryConfig.RequestFunc(evaluate)
c.SCfg = sCfg
if len(c.SCfg.Headers) > 0 {
c.metadata = metadata.New(c.SCfg.Headers)
Expand Down Expand Up @@ -287,143 +288,45 @@ func (c *Connection) ContextWithStop(ctx context.Context) (context.Context, cont
}

func (c *Connection) DoRequest(ctx context.Context, fn func(context.Context) error) error {
expBackoff := newExponentialBackoff(c.cfg.RetrySettings)

for {
ctx, cancel := c.ContextWithStop(ctx)
defer cancel()
return c.requestFunc(ctx, func(ctx context.Context) error {
err := fn(ctx)
if err == nil {
// request succeeded.
return nil
}

if !c.cfg.RetrySettings.Enabled {
return err
}

// We have an error, check gRPC status code.
st := status.Convert(err)
if st.Code() == codes.OK {
// Not really an error, still success.
return nil
}

// Now, this is this a real error.

if !shouldRetry(st.Code()) {
// It is not a retryable error, we should not retry.
return err
}

// Need to retry.

throttle := getThrottleDuration(st)

backoffDelay := expBackoff.NextBackOff()
if backoffDelay == backoff.Stop {
// throw away the batch
err = fmt.Errorf("max elapsed time expired: %w", err)
return err
}

var delay time.Duration

if backoffDelay > throttle {
delay = backoffDelay
} else {
if expBackoff.GetElapsedTime()+throttle > expBackoff.MaxElapsedTime {
err = fmt.Errorf("max elapsed time expired when respecting server throttle: %w", err)
return err
}

// Respect server throttling.
delay = throttle
}

// back-off, but get interrupted when shutting down or request is cancelled or timed out.
err = func() error {
dt := time.NewTimer(delay)
defer dt.Stop()

select {
case <-ctx.Done():
return ctx.Err()
case <-c.stopCh:
return fmt.Errorf("interrupted due to shutdown: %w", err)
case <-dt.C:
}

// nil is converted to OK.
if status.Code(err) == codes.OK {
// Success.
return nil
}()

if err != nil {
return err
}

}
return err
})
}

func shouldRetry(code codes.Code) bool {
switch code {
case codes.OK:
// Success. This function should not be called for this code, the best we
// can do is tell the caller not to retry.
return false

// evaluate returns if err is retry-able and a duration to wait for if an
// explicit throttle time is included in err.
func evaluate(err error) (bool, time.Duration) {
s := status.Convert(err)
switch s.Code() {
case codes.Canceled,
codes.DeadlineExceeded,
codes.ResourceExhausted,
codes.Aborted,
codes.OutOfRange,
codes.Unavailable,
codes.DataLoss:
// These are retryable errors.
return true

case codes.Unknown,
codes.InvalidArgument,
codes.Unauthenticated,
codes.PermissionDenied,
codes.NotFound,
codes.AlreadyExists,
codes.FailedPrecondition,
codes.Unimplemented,
codes.Internal:
// These are fatal errors, don't retry.
return false

default:
// Don't retry on unknown codes.
return false
return true, throttleDelay(s)
}

// Not a retry-able error.
return false, 0
}

func getThrottleDuration(status *status.Status) time.Duration {
// See if throttling information is available.
// throttleDelay returns a duration to wait for if an explicit throttle time
// is included in the response status.
func throttleDelay(status *status.Status) time.Duration {
for _, detail := range status.Details() {
if t, ok := detail.(*errdetails.RetryInfo); ok {
if t.RetryDelay.Seconds > 0 || t.RetryDelay.Nanos > 0 {
// We are throttled. Wait before retrying as requested by the server.
return time.Duration(t.RetryDelay.Seconds)*time.Second + time.Duration(t.RetryDelay.Nanos)*time.Nanosecond
}
return 0
return t.RetryDelay.AsDuration()
}
}
return 0
}

func newExponentialBackoff(rs otlpconfig.RetrySettings) *backoff.ExponentialBackOff {
// Do not use NewExponentialBackOff since it calls Reset and the code here must
// call Reset after changing the InitialInterval (this saves an unnecessary call to Now).
expBackoff := &backoff.ExponentialBackOff{
InitialInterval: rs.InitialInterval,
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
MaxInterval: rs.MaxInterval,
MaxElapsedTime: rs.MaxElapsedTime,
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}
expBackoff.Reset()

return expBackoff
}
Loading