diff --git a/.chloggen/telemetrygen_add_timeout_flag.yaml b/.chloggen/telemetrygen_add_timeout_flag.yaml new file mode 100644 index 0000000000000..ed7bf8a7531f9 --- /dev/null +++ b/.chloggen/telemetrygen_add_timeout_flag.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog) +component: cmd/telemetrygen + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Add new `--timeout` flag to set timeout for telemetrygen calls" + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [47203] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/cmd/telemetrygen/README.md b/cmd/telemetrygen/README.md index d4196ea0c490a..09ae41417e489 100644 --- a/cmd/telemetrygen/README.md +++ b/cmd/telemetrygen/README.md @@ -63,11 +63,19 @@ service: Starting OpenTelemetry collector via docker: ``` -docker run -p 4317:4317 -v $(pwd)/config.yaml:/etc/otelcol-contrib/config.yaml ghcr.io/open-telemetry/opentelemetry-collector-releases/opentelemetry-collector-contrib:0.86.0 +docker run -p 4317:4317 -v $(pwd)/config.yaml:/etc/otelcol-contrib/config.yaml ghcr.io/open-telemetry/opentelemetry-collector-releases/opentelemetry-collector-contrib:0.149.0 ``` Other options for running the collector are documented here https://opentelemetry.io/docs/collector/getting-started/ +## Export timeout + +The maximum time to wait for signals to reach the destination can be configured with the following flag: + +```console +--timeout=10s #Defaults to 10s +``` + ## Batching signals All telemetry signals have batching capability configurable with the following flags: diff --git a/cmd/telemetrygen/internal/config/config.go b/cmd/telemetrygen/internal/config/config.go index 7197a11e3c64c..1b734cb3d0ae9 100644 --- a/cmd/telemetrygen/internal/config/config.go +++ b/cmd/telemetrygen/internal/config/config.go @@ -146,6 +146,7 @@ type Config struct { TotalDuration types.DurationWithInf ReportingInterval time.Duration SkipSettingGRPCLogger bool + Timeout time.Duration // OTLP config CustomEndpoint string @@ -264,6 +265,7 @@ func (c *Config) CommonFlags(fs *pflag.FlagSet) { fs.Float64Var(&c.Rate, "rate", c.Rate, "Approximately how many metrics/spans/logs per second each worker should generate. Zero means no throttling.") fs.Var(&c.TotalDuration, "duration", "For how long to run the test. Use 'inf' for infinite duration.") fs.DurationVar(&c.ReportingInterval, "interval", c.ReportingInterval, "Reporting interval") + fs.DurationVar(&c.Timeout, "timeout", c.Timeout, "Maximum time to wait for the signals to reach destination.") fs.StringVar(&c.CustomEndpoint, "otlp-endpoint", c.CustomEndpoint, "Destination endpoint for exporting logs, metrics and traces") fs.BoolVar(&c.Insecure, "otlp-insecure", c.Insecure, "Whether to enable client transport security for the exporter's grpc or http connection") @@ -313,6 +315,7 @@ func (c *Config) SetDefaults() { c.Rate = 0 c.TotalDuration = types.DurationWithInf(0) c.ReportingInterval = 1 * time.Second + c.Timeout = 10 * time.Second c.CustomEndpoint = "" c.Insecure = false c.InsecureSkipVerify = false diff --git a/cmd/telemetrygen/internal/config/config_test.go b/cmd/telemetrygen/internal/config/config_test.go index ce192d0e177f6..c8db8c75737aa 100644 --- a/cmd/telemetrygen/internal/config/config_test.go +++ b/cmd/telemetrygen/internal/config/config_test.go @@ -5,11 +5,18 @@ package config import ( "testing" + "time" "github.com/stretchr/testify/assert" "go.opentelemetry.io/otel/attribute" ) +func TestSetDefaults_Timeout(t *testing.T) { + cfg := &Config{} + cfg.SetDefaults() + assert.Equal(t, 10*time.Second, cfg.Timeout) +} + func TestKeyValueSet(t *testing.T) { tests := []struct { flag string diff --git a/cmd/telemetrygen/pkg/logs/exporter.go b/cmd/telemetrygen/pkg/logs/exporter.go index b609bfa1d3fae..d803decbc44f8 100644 --- a/cmd/telemetrygen/pkg/logs/exporter.go +++ b/cmd/telemetrygen/pkg/logs/exporter.go @@ -35,6 +35,8 @@ func grpcExporterOptions(cfg *Config) ([]otlploggrpc.Option, error) { grpcExpOpt = append(grpcExpOpt, otlploggrpc.WithHeaders(cfg.GetHeaders())) } + grpcExpOpt = append(grpcExpOpt, otlploggrpc.WithTimeout(cfg.Timeout)) + return grpcExpOpt, nil } @@ -62,5 +64,7 @@ func httpExporterOptions(cfg *Config) ([]otlploghttp.Option, error) { httpExpOpt = append(httpExpOpt, otlploghttp.WithHeaders(cfg.GetHeaders())) } + httpExpOpt = append(httpExpOpt, otlploghttp.WithTimeout(cfg.Timeout)) + return httpExpOpt, nil } diff --git a/cmd/telemetrygen/pkg/logs/integration_test.go b/cmd/telemetrygen/pkg/logs/integration_test.go index 5537f7878a72b..d3842b7b18331 100644 --- a/cmd/telemetrygen/pkg/logs/integration_test.go +++ b/cmd/telemetrygen/pkg/logs/integration_test.go @@ -9,6 +9,9 @@ import ( "context" "fmt" "net" + "net/http" + "net/http/httptest" + "net/url" "os" "os/exec" "sync" @@ -19,7 +22,12 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/pdata/plog" collectorlogs "go.opentelemetry.io/collector/pdata/plog/plogotlp" + "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc" + "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp" + sdklog "go.opentelemetry.io/otel/sdk/log" "google.golang.org/grpc" + + "github.com/open-telemetry/opentelemetry-collector-contrib/cmd/telemetrygen/internal/config" ) // mockLogsReceiver is a mock gRPC receiver for testing @@ -328,3 +336,116 @@ func TestTelemetrygenIntegration(t *testing.T) { assert.Len(t, logs, 4, "Should have received exactly 4 log batches with batching disabled") }) } + +func TestHTTPExporterOptions_Timeout(t *testing.T) { + for name, tc := range map[string]struct { + timeout time.Duration + handlerDelay time.Duration + expectError bool + }{ + "TimeoutElapsed": { + timeout: 50 * time.Millisecond, + handlerDelay: 500 * time.Millisecond, + expectError: true, + }, + "TimeoutNotElapsed": { + timeout: 500 * time.Millisecond, + expectError: false, + }, + "NoTimeout": { + timeout: 0, + expectError: false, + }, + } { + t.Run(name, func(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(http.ResponseWriter, *http.Request) { + if tc.handlerDelay > 0 { + time.Sleep(tc.handlerDelay) + } + })) + defer srv.Close() + srvURL, _ := url.Parse(srv.URL) + + cfg := Config{ + Config: config.Config{ + Insecure: true, + CustomEndpoint: srvURL.Host, + Timeout: tc.timeout, + }, + } + opts, err := httpExporterOptions(&cfg) + require.NoError(t, err) + + exp, err := otlploghttp.New(t.Context(), opts...) + require.NoError(t, err) + t.Cleanup(func() { _ = exp.Shutdown(t.Context()) }) + + err = exp.Export(t.Context(), []sdklog.Record{{}}) + if tc.expectError { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }) + } +} + +func TestGRPCExporterOptions_Timeout(t *testing.T) { + for name, tc := range map[string]struct { + timeout time.Duration + handlerDelay time.Duration + expectError bool + }{ + "TimeoutElapsed": { + timeout: 50 * time.Millisecond, + handlerDelay: 500 * time.Millisecond, + expectError: true, + }, + "TimeoutNotElapsed": { + timeout: 500 * time.Millisecond, + expectError: false, + }, + "NoTimeout": { + timeout: 0, + expectError: false, + }, + } { + t.Run(name, func(t *testing.T) { + srv := grpc.NewServer(grpc.UnaryInterceptor(func(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { + if tc.handlerDelay > 0 { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-time.After(tc.handlerDelay): + } + } + return handler(ctx, req) + })) + collectorlogs.RegisterGRPCServer(srv, &mockLogsReceiver{}) + lis, err := net.Listen("tcp", "localhost:0") + require.NoError(t, err) + go srv.Serve(lis) //nolint:errcheck + defer srv.Stop() + + cfg := Config{ + Config: config.Config{ + Insecure: true, + CustomEndpoint: lis.Addr().String(), + Timeout: tc.timeout, + }, + } + opts, err := grpcExporterOptions(&cfg) + require.NoError(t, err) + exp, err := otlploggrpc.New(t.Context(), opts...) + require.NoError(t, err) + defer func() { _ = exp.Shutdown(t.Context()) }() + + err = exp.Export(t.Context(), []sdklog.Record{{}}) + if tc.expectError { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }) + } +} diff --git a/cmd/telemetrygen/pkg/logs/logs.go b/cmd/telemetrygen/pkg/logs/logs.go index 371fd00efb1d8..401129fea1d70 100644 --- a/cmd/telemetrygen/pkg/logs/logs.go +++ b/cmd/telemetrygen/pkg/logs/logs.go @@ -124,6 +124,7 @@ func exporterFactory(cfg *Config, logger *zap.Logger) exporterFunc { func createExporter(cfg *Config, logger *zap.Logger) (sdklog.Exporter, error) { var exp sdklog.Exporter var err error + if cfg.UseHTTP { var exporterOpts []otlploghttp.Option diff --git a/cmd/telemetrygen/pkg/metrics/exporter.go b/cmd/telemetrygen/pkg/metrics/exporter.go index 862bba5ebf0e3..884fcaa61139f 100644 --- a/cmd/telemetrygen/pkg/metrics/exporter.go +++ b/cmd/telemetrygen/pkg/metrics/exporter.go @@ -35,6 +35,8 @@ func grpcExporterOptions(cfg *Config) ([]otlpmetricgrpc.Option, error) { grpcExpOpt = append(grpcExpOpt, otlpmetricgrpc.WithHeaders(cfg.GetHeaders())) } + grpcExpOpt = append(grpcExpOpt, otlpmetricgrpc.WithTimeout(cfg.Timeout)) + return grpcExpOpt, nil } @@ -62,5 +64,7 @@ func httpExporterOptions(cfg *Config) ([]otlpmetrichttp.Option, error) { httpExpOpt = append(httpExpOpt, otlpmetrichttp.WithHeaders(cfg.GetHeaders())) } + httpExpOpt = append(httpExpOpt, otlpmetrichttp.WithTimeout(cfg.Timeout)) + return httpExpOpt, nil } diff --git a/cmd/telemetrygen/pkg/metrics/integration_test.go b/cmd/telemetrygen/pkg/metrics/integration_test.go index 7559af78b86c9..86de751ceab0d 100644 --- a/cmd/telemetrygen/pkg/metrics/integration_test.go +++ b/cmd/telemetrygen/pkg/metrics/integration_test.go @@ -9,6 +9,9 @@ import ( "context" "fmt" "net" + "net/http" + "net/http/httptest" + "net/url" "os" "os/exec" "sync" @@ -19,7 +22,12 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/pdata/pmetric" collectormetrics "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" + "go.opentelemetry.io/otel/sdk/metric/metricdata" "google.golang.org/grpc" + + "github.com/open-telemetry/opentelemetry-collector-contrib/cmd/telemetrygen/internal/config" ) // mockMetricsReceiver is a mock gRPC receiver for testing @@ -310,3 +318,116 @@ func TestTelemetrygenIntegration(t *testing.T) { assert.Len(t, metrics, 2, "Should have received 2 batches with default batch size 100") }) } + +func TestHTTPExporterOptions_Timeout(t *testing.T) { + for name, tc := range map[string]struct { + timeout time.Duration + handlerDelay time.Duration + expectError bool + }{ + "TimeoutElapsed": { + timeout: 50 * time.Millisecond, + handlerDelay: 500 * time.Millisecond, + expectError: true, + }, + "TimeoutNotElapsed": { + timeout: 500 * time.Millisecond, + expectError: false, + }, + "NoTimeout": { + timeout: 0, + expectError: false, + }, + } { + t.Run(name, func(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(http.ResponseWriter, *http.Request) { + if tc.handlerDelay > 0 { + time.Sleep(tc.handlerDelay) + } + })) + defer srv.Close() + srvURL, _ := url.Parse(srv.URL) + + cfg := Config{ + Config: config.Config{ + Insecure: true, + CustomEndpoint: srvURL.Host, + Timeout: tc.timeout, + }, + } + opts, err := httpExporterOptions(&cfg) + require.NoError(t, err) + + exp, err := otlpmetrichttp.New(t.Context(), opts...) + require.NoError(t, err) + t.Cleanup(func() { _ = exp.Shutdown(t.Context()) }) + + err = exp.Export(t.Context(), &metricdata.ResourceMetrics{}) + if tc.expectError { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }) + } +} + +func TestGRPCExporterOptions_Timeout(t *testing.T) { + for name, tc := range map[string]struct { + timeout time.Duration + handlerDelay time.Duration + expectError bool + }{ + "TimeoutElapsed": { + timeout: 50 * time.Millisecond, + handlerDelay: 500 * time.Millisecond, + expectError: true, + }, + "TimeoutNotElapsed": { + timeout: 500 * time.Millisecond, + expectError: false, + }, + "NoTimeout": { + timeout: 0, + expectError: false, + }, + } { + t.Run(name, func(t *testing.T) { + srv := grpc.NewServer(grpc.UnaryInterceptor(func(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { + if tc.handlerDelay > 0 { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-time.After(tc.handlerDelay): + } + } + return handler(ctx, req) + })) + collectormetrics.RegisterGRPCServer(srv, &mockMetricsReceiver{}) + lis, err := net.Listen("tcp", "localhost:0") + require.NoError(t, err) + go srv.Serve(lis) //nolint:errcheck + defer srv.Stop() + + cfg := Config{ + Config: config.Config{ + Insecure: true, + CustomEndpoint: lis.Addr().String(), + Timeout: tc.timeout, + }, + } + opts, err := grpcExporterOptions(&cfg) + require.NoError(t, err) + exp, err := otlpmetricgrpc.New(t.Context(), opts...) + require.NoError(t, err) + defer func() { _ = exp.Shutdown(t.Context()) }() + + err = exp.Export(t.Context(), &metricdata.ResourceMetrics{}) + if tc.expectError { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }) + } +} diff --git a/cmd/telemetrygen/pkg/metrics/metrics.go b/cmd/telemetrygen/pkg/metrics/metrics.go index 3f72089a2414d..4c4aea9917272 100644 --- a/cmd/telemetrygen/pkg/metrics/metrics.go +++ b/cmd/telemetrygen/pkg/metrics/metrics.go @@ -125,6 +125,7 @@ func exporterFactory(cfg *Config, logger *zap.Logger) exporterFunc { func createExporter(cfg *Config, logger *zap.Logger) (sdkmetric.Exporter, error) { var exp sdkmetric.Exporter var err error + if cfg.UseHTTP { var exporterOpts []otlpmetrichttp.Option diff --git a/cmd/telemetrygen/pkg/traces/exporter.go b/cmd/telemetrygen/pkg/traces/exporter.go index 2db29fffbfe08..137f6bbcce0c6 100644 --- a/cmd/telemetrygen/pkg/traces/exporter.go +++ b/cmd/telemetrygen/pkg/traces/exporter.go @@ -35,6 +35,8 @@ func grpcExporterOptions(cfg *Config) ([]otlptracegrpc.Option, error) { grpcExpOpt = append(grpcExpOpt, otlptracegrpc.WithHeaders(cfg.GetHeaders())) } + grpcExpOpt = append(grpcExpOpt, otlptracegrpc.WithTimeout(cfg.Timeout)) + return grpcExpOpt, nil } @@ -62,5 +64,7 @@ func httpExporterOptions(cfg *Config) ([]otlptracehttp.Option, error) { httpExpOpt = append(httpExpOpt, otlptracehttp.WithHeaders(cfg.GetHeaders())) } + httpExpOpt = append(httpExpOpt, otlptracehttp.WithTimeout(cfg.Timeout)) + return httpExpOpt, nil } diff --git a/cmd/telemetrygen/pkg/traces/integration_test.go b/cmd/telemetrygen/pkg/traces/integration_test.go index 11b35d7f860f7..26f8651e91c50 100644 --- a/cmd/telemetrygen/pkg/traces/integration_test.go +++ b/cmd/telemetrygen/pkg/traces/integration_test.go @@ -9,6 +9,9 @@ import ( "context" "fmt" "net" + "net/http" + "net/http/httptest" + "net/url" "os" "os/exec" "sync" @@ -19,7 +22,12 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/pdata/ptrace" collectortraces "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" + tracepb "go.opentelemetry.io/proto/otlp/trace/v1" "google.golang.org/grpc" + + "github.com/open-telemetry/opentelemetry-collector-contrib/cmd/telemetrygen/internal/config" ) // mockTracesReceiver is a mock gRPC receiver for testing @@ -251,3 +259,114 @@ func TestTelemetrygenIntegration(t *testing.T) { assert.NotEmpty(t, traces, "Should have generated some traces") }) } + +func TestHTTPExporterOptions_Timeout(t *testing.T) { + for name, tc := range map[string]struct { + timeout time.Duration + handlerDelay time.Duration + expectError bool + }{ + "TimeoutElapsed": { + timeout: 50 * time.Millisecond, + handlerDelay: 500 * time.Millisecond, + expectError: true, + }, + "TimeoutNotElapsed": { + timeout: 500 * time.Millisecond, + expectError: false, + }, + "NoTimeout": { + timeout: 0, + expectError: false, + }, + } { + t.Run(name, func(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(http.ResponseWriter, *http.Request) { + if tc.handlerDelay > 0 { + time.Sleep(tc.handlerDelay) + } + })) + defer srv.Close() + srvURL, _ := url.Parse(srv.URL) + + cfg := Config{ + Config: config.Config{ + Insecure: true, + CustomEndpoint: srvURL.Host, + Timeout: tc.timeout, + }, + } + opts, err := httpExporterOptions(&cfg) + require.NoError(t, err) + client := otlptracehttp.NewClient(opts...) + + err = client.UploadTraces(t.Context(), []*tracepb.ResourceSpans{{}}) + if tc.expectError { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }) + } +} + +func TestGRPCExporterOptions_Timeout(t *testing.T) { + for name, tc := range map[string]struct { + timeout time.Duration + handlerDelay time.Duration + expectError bool + }{ + "TimeoutElapsed": { + timeout: 50 * time.Millisecond, + handlerDelay: 500 * time.Millisecond, + expectError: true, + }, + "TimeoutNotElapsed": { + timeout: 500 * time.Millisecond, + expectError: false, + }, + "NoTimeout": { + timeout: 0, + expectError: false, + }, + } { + t.Run(name, func(t *testing.T) { + srv := grpc.NewServer(grpc.UnaryInterceptor(func(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { + if tc.handlerDelay > 0 { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-time.After(tc.handlerDelay): + } + } + return handler(ctx, req) + })) + collectortraces.RegisterGRPCServer(srv, &mockTracesReceiver{}) + lis, err := net.Listen("tcp", "localhost:0") + require.NoError(t, err) + go srv.Serve(lis) //nolint:errcheck + defer srv.Stop() + + cfg := Config{ + Config: config.Config{ + Insecure: true, + CustomEndpoint: lis.Addr().String(), + Timeout: tc.timeout, + }, + } + opts, err := grpcExporterOptions(&cfg) + require.NoError(t, err) + client := otlptracegrpc.NewClient(opts...) + err = client.Start(t.Context()) + require.NoError(t, err) + defer func() { _ = client.Stop(t.Context()) }() + + err = client.UploadTraces(t.Context(), []*tracepb.ResourceSpans{{}}) + if tc.expectError { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }) + } +}