Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/telemetrygen_add_timeout_flag.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog)
component: cmd/telemetrygen

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Add new `--timeout` flag to set timeout for telemetrygen calls"

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [47203]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
10 changes: 9 additions & 1 deletion cmd/telemetrygen/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,19 @@ service:

Starting OpenTelemetry collector via docker:
```
docker run -p 4317:4317 -v $(pwd)/config.yaml:/etc/otelcol-contrib/config.yaml ghcr.io/open-telemetry/opentelemetry-collector-releases/opentelemetry-collector-contrib:0.86.0
docker run -p 4317:4317 -v $(pwd)/config.yaml:/etc/otelcol-contrib/config.yaml ghcr.io/open-telemetry/opentelemetry-collector-releases/opentelemetry-collector-contrib:0.149.0
```

Other options for running the collector are documented here https://opentelemetry.io/docs/collector/getting-started/

## Export timeout

The maximum time to wait for signals to reach the destination can be configured with the following flag:

```console
--timeout=10s #Defaults to 10s
```

## Batching signals

All telemetry signals have batching capability configurable with the following flags:
Expand Down
3 changes: 3 additions & 0 deletions cmd/telemetrygen/internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ type Config struct {
TotalDuration types.DurationWithInf
ReportingInterval time.Duration
SkipSettingGRPCLogger bool
Timeout time.Duration

// OTLP config
CustomEndpoint string
Expand Down Expand Up @@ -264,6 +265,7 @@ func (c *Config) CommonFlags(fs *pflag.FlagSet) {
fs.Float64Var(&c.Rate, "rate", c.Rate, "Approximately how many metrics/spans/logs per second each worker should generate. Zero means no throttling.")
fs.Var(&c.TotalDuration, "duration", "For how long to run the test. Use 'inf' for infinite duration.")
fs.DurationVar(&c.ReportingInterval, "interval", c.ReportingInterval, "Reporting interval")
fs.DurationVar(&c.Timeout, "timeout", c.Timeout, "Maximum time to wait for the signals to reach destination.")

fs.StringVar(&c.CustomEndpoint, "otlp-endpoint", c.CustomEndpoint, "Destination endpoint for exporting logs, metrics and traces")
fs.BoolVar(&c.Insecure, "otlp-insecure", c.Insecure, "Whether to enable client transport security for the exporter's grpc or http connection")
Expand Down Expand Up @@ -313,6 +315,7 @@ func (c *Config) SetDefaults() {
c.Rate = 0
c.TotalDuration = types.DurationWithInf(0)
c.ReportingInterval = 1 * time.Second
c.Timeout = 10 * time.Second
c.CustomEndpoint = ""
c.Insecure = false
c.InsecureSkipVerify = false
Expand Down
7 changes: 7 additions & 0 deletions cmd/telemetrygen/internal/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,18 @@ package config

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/otel/attribute"
)

func TestSetDefaults_Timeout(t *testing.T) {
cfg := &Config{}
cfg.SetDefaults()
assert.Equal(t, 10*time.Second, cfg.Timeout)
}

func TestKeyValueSet(t *testing.T) {
tests := []struct {
flag string
Expand Down
4 changes: 4 additions & 0 deletions cmd/telemetrygen/pkg/logs/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ func grpcExporterOptions(cfg *Config) ([]otlploggrpc.Option, error) {
grpcExpOpt = append(grpcExpOpt, otlploggrpc.WithHeaders(cfg.GetHeaders()))
}

grpcExpOpt = append(grpcExpOpt, otlploggrpc.WithTimeout(cfg.Timeout))

return grpcExpOpt, nil
}

Expand Down Expand Up @@ -62,5 +64,7 @@ func httpExporterOptions(cfg *Config) ([]otlploghttp.Option, error) {
httpExpOpt = append(httpExpOpt, otlploghttp.WithHeaders(cfg.GetHeaders()))
}

httpExpOpt = append(httpExpOpt, otlploghttp.WithTimeout(cfg.Timeout))

return httpExpOpt, nil
}
121 changes: 121 additions & 0 deletions cmd/telemetrygen/pkg/logs/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import (
"context"
"fmt"
"net"
"net/http"
"net/http/httptest"
"net/url"
"os"
"os/exec"
"sync"
Expand All @@ -19,7 +22,12 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/plog"
collectorlogs "go.opentelemetry.io/collector/pdata/plog/plogotlp"
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp"
sdklog "go.opentelemetry.io/otel/sdk/log"
"google.golang.org/grpc"

"github.com/open-telemetry/opentelemetry-collector-contrib/cmd/telemetrygen/internal/config"
)

// mockLogsReceiver is a mock gRPC receiver for testing
Expand Down Expand Up @@ -328,3 +336,116 @@ func TestTelemetrygenIntegration(t *testing.T) {
assert.Len(t, logs, 4, "Should have received exactly 4 log batches with batching disabled")
})
}

func TestHTTPExporterOptions_Timeout(t *testing.T) {
for name, tc := range map[string]struct {
timeout time.Duration
handlerDelay time.Duration
expectError bool
}{
"TimeoutElapsed": {
timeout: 50 * time.Millisecond,
handlerDelay: 500 * time.Millisecond,
expectError: true,
},
"TimeoutNotElapsed": {
timeout: 500 * time.Millisecond,
expectError: false,
},
"NoTimeout": {
timeout: 0,
expectError: false,
},
} {
t.Run(name, func(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(http.ResponseWriter, *http.Request) {
if tc.handlerDelay > 0 {
time.Sleep(tc.handlerDelay)
}
}))
defer srv.Close()
srvURL, _ := url.Parse(srv.URL)

cfg := Config{
Config: config.Config{
Insecure: true,
CustomEndpoint: srvURL.Host,
Timeout: tc.timeout,
},
}
opts, err := httpExporterOptions(&cfg)
require.NoError(t, err)

exp, err := otlploghttp.New(t.Context(), opts...)
require.NoError(t, err)
t.Cleanup(func() { _ = exp.Shutdown(t.Context()) })

err = exp.Export(t.Context(), []sdklog.Record{{}})
if tc.expectError {
require.Error(t, err)
} else {
require.NoError(t, err)
}
})
}
}

func TestGRPCExporterOptions_Timeout(t *testing.T) {
for name, tc := range map[string]struct {
timeout time.Duration
handlerDelay time.Duration
expectError bool
}{
"TimeoutElapsed": {
timeout: 50 * time.Millisecond,
handlerDelay: 500 * time.Millisecond,
expectError: true,
},
"TimeoutNotElapsed": {
timeout: 500 * time.Millisecond,
expectError: false,
},
"NoTimeout": {
timeout: 0,
expectError: false,
},
} {
t.Run(name, func(t *testing.T) {
srv := grpc.NewServer(grpc.UnaryInterceptor(func(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
if tc.handlerDelay > 0 {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-time.After(tc.handlerDelay):
}
}
return handler(ctx, req)
}))
collectorlogs.RegisterGRPCServer(srv, &mockLogsReceiver{})
lis, err := net.Listen("tcp", "localhost:0")
require.NoError(t, err)
go srv.Serve(lis) //nolint:errcheck
defer srv.Stop()

cfg := Config{
Config: config.Config{
Insecure: true,
CustomEndpoint: lis.Addr().String(),
Timeout: tc.timeout,
},
}
opts, err := grpcExporterOptions(&cfg)
require.NoError(t, err)
exp, err := otlploggrpc.New(t.Context(), opts...)
require.NoError(t, err)
defer func() { _ = exp.Shutdown(t.Context()) }()

err = exp.Export(t.Context(), []sdklog.Record{{}})
if tc.expectError {
require.Error(t, err)
} else {
require.NoError(t, err)
}
})
}
}
1 change: 1 addition & 0 deletions cmd/telemetrygen/pkg/logs/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ func exporterFactory(cfg *Config, logger *zap.Logger) exporterFunc {
func createExporter(cfg *Config, logger *zap.Logger) (sdklog.Exporter, error) {
var exp sdklog.Exporter
var err error

if cfg.UseHTTP {
var exporterOpts []otlploghttp.Option

Expand Down
4 changes: 4 additions & 0 deletions cmd/telemetrygen/pkg/metrics/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ func grpcExporterOptions(cfg *Config) ([]otlpmetricgrpc.Option, error) {
grpcExpOpt = append(grpcExpOpt, otlpmetricgrpc.WithHeaders(cfg.GetHeaders()))
}

grpcExpOpt = append(grpcExpOpt, otlpmetricgrpc.WithTimeout(cfg.Timeout))

return grpcExpOpt, nil
}

Expand Down Expand Up @@ -62,5 +64,7 @@ func httpExporterOptions(cfg *Config) ([]otlpmetrichttp.Option, error) {
httpExpOpt = append(httpExpOpt, otlpmetrichttp.WithHeaders(cfg.GetHeaders()))
}

httpExpOpt = append(httpExpOpt, otlpmetrichttp.WithTimeout(cfg.Timeout))

return httpExpOpt, nil
}
Loading
Loading