Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 25 additions & 27 deletions router-tests/telemetry/attribute_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,33 +188,6 @@ func TestAttributeProcessorIntegration(t *testing.T) {
})
})

t.Run("invalid UTF-8 export error logs config hint", func(t *testing.T) {
t.Parallel()

testenv.Run(t, &testenv.Config{
TraceExporter: &invalidUTF8Exporter{},
LogObservation: testenv.LogObservationConfig{
Enabled: true,
LogLevel: zapcore.ErrorLevel,
},
}, func(t *testing.T, xEnv *testenv.Environment) {
require.Eventually(t, func() bool {
res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{
Query: `query { employees { id } }`,
})
if res.Response.StatusCode != 200 {
return false
}
logs := xEnv.Observer().FilterMessageSnippet("sanitize_utf8").All()
return len(logs) > 0
}, 10*time.Second, 500*time.Millisecond)

logs := xEnv.Observer().FilterMessageSnippet("sanitize_utf8").All()
require.NotEmpty(t, logs)
require.Equal(t, "otel error: traces export: string field contains invalid UTF-8: Enable 'telemetry.tracing.sanitize_utf8.enabled' in your config to sanitize invalid UTF-8 attributes.", logs[0].Message)
})
})

t.Run("IPAnonymization hashes IP attributes", func(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -250,6 +223,31 @@ func TestAttributeProcessorIntegration(t *testing.T) {
require.Positive(t, hashedIPCount)
})
})

t.Run("invalid UTF-8 export error logs config hint", func(t *testing.T) {
t.Parallel()

testenv.Run(t, &testenv.Config{
TraceExporter: &invalidUTF8Exporter{},
LogObservation: testenv.LogObservationConfig{
Enabled: true,
LogLevel: zapcore.ErrorLevel,
},
}, func(t *testing.T, xEnv *testenv.Environment) {
_, err := xEnv.MakeGraphQLRequest(testenv.GraphQLRequest{
Query: `query { employees { id } }`,
})
require.NoError(t, err)
Comment thread
SkArchon marked this conversation as resolved.
require.Eventually(t, func() bool {
logs := xEnv.Observer().FilterMessageSnippet("sanitize_utf8").All()
return len(logs) > 0
}, 10*time.Second, 500*time.Millisecond)

logs := xEnv.Observer().FilterMessageSnippet("sanitize_utf8").All()
require.NotEmpty(t, logs)
require.Equal(t, "otel error: traces export: string field contains invalid UTF-8: Enable 'telemetry.tracing.sanitize_utf8.enabled' in your config to sanitize invalid UTF-8 attributes.", logs[0].Message)
Comment thread
SkArchon marked this conversation as resolved.
})
})
}

// errInvalidUTF8 mimics google.golang.org/protobuf/internal/impl.errInvalidUTF8
Expand Down
8 changes: 7 additions & 1 deletion router/pkg/trace/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,13 @@ func NewTracerProvider(ctx context.Context, config *ProviderConfig) (*sdktrace.T

// Either memory exporter or the configured exporters are used.
if config.MemoryExporter != nil {
opts = append(opts, sdktrace.WithSyncer(config.MemoryExporter))
// Use a custom span processor that routes export errors through the
// instance's logger instead of the global otel.Handle. This avoids
// flaky tests when parallel tests overwrite the global error handler.
opts = append(opts, sdktrace.WithSpanProcessor(&syncSpanProcessor{
exporter: config.MemoryExporter,
handler: errHandler(config),
}))
} else {
for _, exp := range config.Config.Exporters {
if exp.Disabled {
Expand Down
33 changes: 33 additions & 0 deletions router/pkg/trace/syncspan_test_exporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package trace

import (
"context"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)

// syncSpanProcessor exports spans synchronously and routes errors through a
// local handler instead of the global otel.Handle. This is equivalent to
// sdktrace.WithSyncer but avoids global error handler races in parallel tests.
type syncSpanProcessor struct {
exporter sdktrace.SpanExporter
handler func(error)
}

func (p *syncSpanProcessor) OnStart(_ context.Context, _ sdktrace.ReadWriteSpan) {}

func (p *syncSpanProcessor) OnEnd(s sdktrace.ReadOnlySpan) {
if !s.SpanContext().IsSampled() {
return
}
if err := p.exporter.ExportSpans(context.Background(), []sdktrace.ReadOnlySpan{s}); err != nil {
p.handler(err)
}
}

func (p *syncSpanProcessor) Shutdown(ctx context.Context) error {
return p.exporter.Shutdown(ctx)
}

func (p *syncSpanProcessor) ForceFlush(ctx context.Context) error {
return nil
}
Loading