Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,21 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

## [Unreleased]

### Added

- Added support for configuring OTLP/HTTP Endpoints, Headers, Compression and Timeout via the Environment Variables. (#1758)
- `OTEL_EXPORTER_OTLP_ENDPOINT`
- `OTEL_EXPORTER_OTLP_TRACES_ENDPOINT`
- `OTEL_EXPORTER_OTLP_METRICS_ENDPOINT`
- `OTEL_EXPORTER_OTLP_HEADERS`
- `OTEL_EXPORTER_OTLP_TRACES_HEADERS`
- `OTEL_EXPORTER_OTLP_METRICS_HEADERS`
- `OTEL_EXPORTER_OTLP_COMPRESSION`
- `OTEL_EXPORTER_OTLP_TRACES_COMPRESSION`
- `OTEL_EXPORTER_OTLP_METRICS_COMPRESSION`
- `OTEL_EXPORTER_OTLP_TIMEOUT`
- `OTEL_EXPORTER_OTLP_TRACES_TIMEOUT`
- `OTEL_EXPORTER_OTLP_METRICS_TIMEOUT`
### Fixed

- The `Span.IsRecording` implementation from `go.opentelemetry.io/otel/sdk/trace` always returns false when not being sampled. (#1750)
Expand Down
89 changes: 59 additions & 30 deletions exporters/otlp/otlphttp/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"math/rand"
"net"
"net/http"
"os"
"path"
"strings"
"time"
Expand Down Expand Up @@ -62,30 +63,33 @@ var ourTransport *http.Transport = &http.Transport{
}

type driver struct {
client *http.Client
cfg config
metricsDriver signalDriver
tracesDriver signalDriver
cfg config

stopCh chan struct{}
}

type signalDriver struct {
cfg signalConfig
generalCfg config
client *http.Client
stopCh chan struct{}
}

var _ otlp.ProtocolDriver = (*driver)(nil)

// NewDriver creates a new HTTP driver.
func NewDriver(opts ...Option) otlp.ProtocolDriver {
cfg := config{
endpoint: fmt.Sprintf("%s:%d", otlp.DefaultCollectorHost, otlp.DefaultCollectorPort),
compression: NoCompression,
tracesURLPath: DefaultTracesPath,
metricsURLPath: DefaultMetricsPath,
maxAttempts: DefaultMaxAttempts,
backoff: DefaultBackoff,
}
cfg := newDefaultConfig()
applyEnvConfigs(&cfg, os.Getenv)

for _, opt := range opts {
opt.Apply(&cfg)
}
for pathPtr, defaultPath := range map[*string]string{
&cfg.tracesURLPath: DefaultTracesPath,
&cfg.metricsURLPath: DefaultMetricsPath,
&cfg.traces.urlPath: DefaultTracesPath,
&cfg.metrics.urlPath: DefaultMetricsPath,
} {
tmp := strings.TrimSpace(*pathPtr)
if tmp == "" {
Expand All @@ -107,18 +111,43 @@ func NewDriver(opts ...Option) otlp.ProtocolDriver {
if cfg.backoff <= 0 {
cfg.backoff = DefaultBackoff
}
client := &http.Client{

metricsClient := &http.Client{
Transport: ourTransport,
Timeout: cfg.metrics.timeout,
}
if cfg.metrics.tlsCfg != nil {
transport := ourTransport.Clone()
transport.TLSClientConfig = cfg.metrics.tlsCfg
metricsClient.Transport = transport
}

tracesClient := &http.Client{
Transport: ourTransport,
Timeout: cfg.traces.timeout,
}
if cfg.tlsCfg != nil {
if cfg.traces.tlsCfg != nil {
transport := ourTransport.Clone()
transport.TLSClientConfig = cfg.tlsCfg
client.Transport = transport
transport.TLSClientConfig = cfg.traces.tlsCfg
tracesClient.Transport = transport
}

stopCh := make(chan struct{})
return &driver{
client: client,
tracesDriver: signalDriver{
cfg: cfg.traces,
generalCfg: cfg,
stopCh: stopCh,
client: tracesClient,
},
metricsDriver: signalDriver{
cfg: cfg.metrics,
generalCfg: cfg,
stopCh: stopCh,
client: metricsClient,
},
cfg: cfg,
stopCh: make(chan struct{}),
stopCh: stopCh,
}
}

Expand Down Expand Up @@ -150,7 +179,7 @@ func (d *driver) ExportMetrics(ctx context.Context, cps metricsdk.CheckpointSet,
if err != nil {
return err
}
return d.send(ctx, rawRequest, d.cfg.metricsURLPath)
return d.metricsDriver.send(ctx, rawRequest)
}

// ExportTraces implements otlp.ProtocolDriver.
Expand All @@ -166,7 +195,7 @@ func (d *driver) ExportTraces(ctx context.Context, ss []*tracesdk.SpanSnapshot)
if err != nil {
return err
}
return d.send(ctx, rawRequest, d.cfg.tracesURLPath)
return d.tracesDriver.send(ctx, rawRequest)
}

func (d *driver) marshal(msg proto.Message) ([]byte, error) {
Expand All @@ -176,12 +205,12 @@ func (d *driver) marshal(msg proto.Message) ([]byte, error) {
return proto.Marshal(msg)
}

func (d *driver) send(ctx context.Context, rawRequest []byte, urlPath string) error {
address := fmt.Sprintf("%s://%s%s", d.getScheme(), d.cfg.endpoint, urlPath)
func (d *signalDriver) send(ctx context.Context, rawRequest []byte) error {
address := fmt.Sprintf("%s://%s%s", d.getScheme(), d.cfg.endpoint, d.cfg.urlPath)
var cancel context.CancelFunc
ctx, cancel = d.contextWithStop(ctx)
defer cancel()
for i := 0; i < d.cfg.maxAttempts; i++ {
for i := 0; i < d.generalCfg.maxAttempts; i++ {
response, err := d.singleSend(ctx, rawRequest, address)
if err != nil {
return err
Expand All @@ -198,7 +227,7 @@ func (d *driver) send(ctx context.Context, rawRequest []byte, urlPath string) er
fallthrough
case http.StatusServiceUnavailable:
select {
case <-time.After(getWaitDuration(d.cfg.backoff, i)):
case <-time.After(getWaitDuration(d.generalCfg.backoff, i)):
continue
case <-ctx.Done():
return ctx.Err()
Expand All @@ -207,10 +236,10 @@ func (d *driver) send(ctx context.Context, rawRequest []byte, urlPath string) er
return fmt.Errorf("failed with HTTP status %s", response.Status)
}
}
return fmt.Errorf("failed to send data to %s after %d tries", address, d.cfg.maxAttempts)
return fmt.Errorf("failed to send data to %s after %d tries", address, d.generalCfg.maxAttempts)
}

func (d *driver) getScheme() string {
func (d *signalDriver) getScheme() string {
if d.cfg.insecure {
return "http"
}
Expand All @@ -237,7 +266,7 @@ func getWaitDuration(backoff time.Duration, i int) time.Duration {
return (time.Duration)(k)*backoff + (time.Duration)(jitter)
}

func (d *driver) contextWithStop(ctx context.Context) (context.Context, context.CancelFunc) {
func (d *signalDriver) contextWithStop(ctx context.Context) (context.Context, context.CancelFunc) {
// Unify the parent context Done signal with the driver's stop
// channel.
ctx, cancel := context.WithCancel(ctx)
Expand All @@ -253,7 +282,7 @@ func (d *driver) contextWithStop(ctx context.Context) (context.Context, context.
return ctx, cancel
}

func (d *driver) singleSend(ctx context.Context, rawRequest []byte, address string) (*http.Response, error) {
func (d *signalDriver) singleSend(ctx context.Context, rawRequest []byte, address string) (*http.Response, error) {
request, err := http.NewRequestWithContext(ctx, http.MethodPost, address, nil)
if err != nil {
return nil, err
Expand All @@ -271,14 +300,14 @@ func (d *driver) singleSend(ctx context.Context, rawRequest []byte, address stri
return d.client.Do(request)
}

func (d *driver) prepareBody(rawRequest []byte) (io.ReadCloser, int64, http.Header) {
func (d *signalDriver) prepareBody(rawRequest []byte) (io.ReadCloser, int64, http.Header) {
var bodyReader io.ReadCloser
headers := http.Header{}
for k, v := range d.cfg.headers {
headers.Set(k, v)
}
contentLength := (int64)(len(rawRequest))
if d.cfg.marshaler == MarshalJSON {
if d.generalCfg.marshaler == MarshalJSON {
headers.Set("Content-Type", contentTypeJSON)
} else {
headers.Set("Content-Type", contentTypeProto)
Expand Down
22 changes: 22 additions & 0 deletions exporters/otlp/otlphttp/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package otlphttp_test
import (
"context"
"net/http"
"os"
"testing"
"time"

Expand Down Expand Up @@ -167,6 +168,27 @@ func TestRetry(t *testing.T) {
assert.Len(t, mc.GetSpans(), 1)
}

func TestTimeout(t *testing.T) {
mcCfg := mockCollectorConfig{
InjectDelay: 100 * time.Millisecond,
}
mc := runMockCollector(t, mcCfg)
defer mc.MustStop(t)
driver := otlphttp.NewDriver(
otlphttp.WithEndpoint(mc.Endpoint()),
otlphttp.WithInsecure(),
otlphttp.WithTimeout(50*time.Millisecond),
)
ctx := context.Background()
exporter, err := otlp.NewExporter(ctx, driver)
require.NoError(t, err)
defer func() {
assert.NoError(t, exporter.Shutdown(ctx))
}()
err = exporter.ExportSpans(ctx, otlptest.SingleSpanSnapshot())
assert.Equal(t, true, os.IsTimeout(err))
}

func TestRetryFailed(t *testing.T) {
statuses := []int{
http.StatusTooManyRequests,
Expand Down
132 changes: 132 additions & 0 deletions exporters/otlp/otlphttp/envconfig.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package otlphttp

import (
"fmt"
"net/url"
"strconv"
"strings"
"time"
)

func applyEnvConfigs(cfg *config, getEnv func(string) string) *config {
opts := getOptionsFromEnv(getEnv)
for _, opt := range opts {
opt.Apply(cfg)
}
return cfg
}

func getOptionsFromEnv(env func(string) string) []Option {
var opts []Option

// Endpoint
if v, ok := getEnv(env, "ENDPOINT"); ok {
opts = append(opts, WithEndpoint(v))
}
if v, ok := getEnv(env, "TRACES_ENDPOINT"); ok {
opts = append(opts, WithTracesEndpoint(v))
}
if v, ok := getEnv(env, "METRICS_ENDPOINT"); ok {
opts = append(opts, WithMetricsEndpoint(v))
}

// Certificate File
// TODO: add certificate file env config support

// Headers
if h, ok := getEnv(env, "HEADERS"); ok {
opts = append(opts, WithHeaders(stringToHeader(h)))
}
if h, ok := getEnv(env, "TRACES_HEADERS"); ok {
opts = append(opts, WithTracesHeaders(stringToHeader(h)))
}
if h, ok := getEnv(env, "METRICS_HEADERS"); ok {
opts = append(opts, WithMetricsHeaders(stringToHeader(h)))
}

// Compression
if c, ok := getEnv(env, "COMPRESSION"); ok {
opts = append(opts, WithCompression(stringToCompression(c)))
}
if c, ok := getEnv(env, "TRACES_COMPRESSION"); ok {
opts = append(opts, WithTracesCompression(stringToCompression(c)))
}
if c, ok := getEnv(env, "METRICS_COMPRESSION"); ok {
opts = append(opts, WithMetricsCompression(stringToCompression(c)))
}

// Timeout
if t, ok := getEnv(env, "TIMEOUT"); ok {
if d, err := strconv.Atoi(t); err == nil {
opts = append(opts, WithTimeout(time.Duration(d)*time.Millisecond))
}
}
if t, ok := getEnv(env, "TRACES_TIMEOUT"); ok {
if d, err := strconv.Atoi(t); err == nil {
opts = append(opts, WithTracesTimeout(time.Duration(d)*time.Millisecond))
}
}
if t, ok := getEnv(env, "METRICS_TIMEOUT"); ok {
if d, err := strconv.Atoi(t); err == nil {
opts = append(opts, WithMetricsTimeout(time.Duration(d)*time.Millisecond))
}
}

return opts
}

// getEnv gets an OTLP environment variable value of the specified key using the env function.
// This function already prepends the OTLP prefix to all key lookup.
func getEnv(env func(string) string, key string) (string, bool) {
v := strings.TrimSpace(env(fmt.Sprintf("OTEL_EXPORTER_OTLP_%s", key)))
return v, v != ""
}

func stringToCompression(value string) Compression {
switch value {
case "gzip":
return GzipCompression
}

return NoCompression
}

func stringToHeader(value string) map[string]string {
Comment thread
paivagustavo marked this conversation as resolved.
headersPairs := strings.Split(value, ",")
headers := make(map[string]string)

for _, header := range headersPairs {
nameValue := strings.SplitN(header, "=", 2)
if len(nameValue) < 2 {
continue
}
name, err := url.QueryUnescape(nameValue[0])
if err != nil {
continue
}
trimmedName := strings.TrimSpace(name)
value, err := url.QueryUnescape(nameValue[1])
if err != nil {
continue
}
trimmedValue := strings.TrimSpace(value)

headers[trimmedName] = trimmedValue
}

return headers
}
Loading