diff --git a/router-tests/telemetry/attribute_processor_test.go b/router-tests/telemetry/attribute_processor_test.go index 9d9b6041ce..f567a13032 100644 --- a/router-tests/telemetry/attribute_processor_test.go +++ b/router-tests/telemetry/attribute_processor_test.go @@ -188,33 +188,6 @@ func TestAttributeProcessorIntegration(t *testing.T) { }) }) - t.Run("invalid UTF-8 export error logs config hint", func(t *testing.T) { - t.Parallel() - - testenv.Run(t, &testenv.Config{ - TraceExporter: &invalidUTF8Exporter{}, - LogObservation: testenv.LogObservationConfig{ - Enabled: true, - LogLevel: zapcore.ErrorLevel, - }, - }, func(t *testing.T, xEnv *testenv.Environment) { - require.Eventually(t, func() bool { - res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ - Query: `query { employees { id } }`, - }) - if res.Response.StatusCode != 200 { - return false - } - logs := xEnv.Observer().FilterMessageSnippet("sanitize_utf8").All() - return len(logs) > 0 - }, 10*time.Second, 500*time.Millisecond) - - logs := xEnv.Observer().FilterMessageSnippet("sanitize_utf8").All() - require.NotEmpty(t, logs) - require.Equal(t, "otel error: traces export: string field contains invalid UTF-8: Enable 'telemetry.tracing.sanitize_utf8.enabled' in your config to sanitize invalid UTF-8 attributes.", logs[0].Message) - }) - }) - t.Run("IPAnonymization hashes IP attributes", func(t *testing.T) { t.Parallel() @@ -250,6 +223,31 @@ func TestAttributeProcessorIntegration(t *testing.T) { require.Positive(t, hashedIPCount) }) }) + + t.Run("invalid UTF-8 export error logs config hint", func(t *testing.T) { + t.Parallel() + + testenv.Run(t, &testenv.Config{ + TraceExporter: &invalidUTF8Exporter{}, + LogObservation: testenv.LogObservationConfig{ + Enabled: true, + LogLevel: zapcore.ErrorLevel, + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + _, err := xEnv.MakeGraphQLRequest(testenv.GraphQLRequest{ + Query: `query { employees { id } }`, + }) + require.NoError(t, err) + require.Eventually(t, func() bool { + logs := xEnv.Observer().FilterMessageSnippet("sanitize_utf8").All() + return len(logs) > 0 + }, 10*time.Second, 500*time.Millisecond) + + logs := xEnv.Observer().FilterMessageSnippet("sanitize_utf8").All() + require.NotEmpty(t, logs) + require.Equal(t, "otel error: traces export: string field contains invalid UTF-8: Enable 'telemetry.tracing.sanitize_utf8.enabled' in your config to sanitize invalid UTF-8 attributes.", logs[0].Message) + }) + }) } // errInvalidUTF8 mimics google.golang.org/protobuf/internal/impl.errInvalidUTF8 diff --git a/router/pkg/trace/meter.go b/router/pkg/trace/meter.go index 69cef72beb..394543aa8e 100644 --- a/router/pkg/trace/meter.go +++ b/router/pkg/trace/meter.go @@ -169,7 +169,13 @@ func NewTracerProvider(ctx context.Context, config *ProviderConfig) (*sdktrace.T // Either memory exporter or the configured exporters are used. if config.MemoryExporter != nil { - opts = append(opts, sdktrace.WithSyncer(config.MemoryExporter)) + // Use a custom span processor that routes export errors through the + // instance's logger instead of the global otel.Handle. This avoids + // flaky tests when parallel tests overwrite the global error handler. + opts = append(opts, sdktrace.WithSpanProcessor(&syncSpanProcessor{ + exporter: config.MemoryExporter, + handler: errHandler(config), + })) } else { for _, exp := range config.Config.Exporters { if exp.Disabled { diff --git a/router/pkg/trace/syncspan_test_exporter.go b/router/pkg/trace/syncspan_test_exporter.go new file mode 100644 index 0000000000..f86203fd7f --- /dev/null +++ b/router/pkg/trace/syncspan_test_exporter.go @@ -0,0 +1,33 @@ +package trace + +import ( + "context" + sdktrace "go.opentelemetry.io/otel/sdk/trace" +) + +// syncSpanProcessor exports spans synchronously and routes errors through a +// local handler instead of the global otel.Handle. This is equivalent to +// sdktrace.WithSyncer but avoids global error handler races in parallel tests. +type syncSpanProcessor struct { + exporter sdktrace.SpanExporter + handler func(error) +} + +func (p *syncSpanProcessor) OnStart(_ context.Context, _ sdktrace.ReadWriteSpan) {} + +func (p *syncSpanProcessor) OnEnd(s sdktrace.ReadOnlySpan) { + if !s.SpanContext().IsSampled() { + return + } + if err := p.exporter.ExportSpans(context.Background(), []sdktrace.ReadOnlySpan{s}); err != nil { + p.handler(err) + } +} + +func (p *syncSpanProcessor) Shutdown(ctx context.Context) error { + return p.exporter.Shutdown(ctx) +} + +func (p *syncSpanProcessor) ForceFlush(ctx context.Context) error { + return nil +}