Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
235 changes: 231 additions & 4 deletions router-tests/telemetry/span_error_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,21 @@ package telemetry
import (
"context"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/wundergraph/cosmo/router-tests/testenv"
"github.com/wundergraph/cosmo/router/pkg/config"
"github.com/wundergraph/cosmo/router/pkg/trace/tracetest"
"go.opentelemetry.io/otel/codes"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
otelsdktracetest "go.opentelemetry.io/otel/sdk/trace/tracetest"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

Expand All @@ -26,10 +30,32 @@ func rootSpan(spans []sdktrace.ReadOnlySpan) sdktrace.ReadOnlySpan {
return spans[len(spans)-1]
}

// waitForStableSpans polls exporter.GetSpans() until the snapshot count stops
// changing for at least 200ms, guaranteeing the trace tree has finished exporting
// before assertions run across all spans.
func waitForStableSpans(t *testing.T, exporter *otelsdktracetest.InMemoryExporter) []sdktrace.ReadOnlySpan {
t.Helper()
var (
spans []sdktrace.ReadOnlySpan
lastCount = -1
stableSince time.Time
)
require.Eventually(t, func() bool {
spans = exporter.GetSpans().Snapshots()
if len(spans) != lastCount {
lastCount = len(spans)
stableSince = time.Now()
return false
}
return len(spans) > 0 && time.Since(stableSince) >= 200*time.Millisecond
}, 5*time.Second, 50*time.Millisecond, "expected span snapshot to stabilize")
return spans
}

func TestClientDisconnectionBehavior(t *testing.T) {
t.Parallel()

t.Run("span status is not error but exception event is recorded", func(t *testing.T) {
t.Run("root span is not marked as error but records exception event on client disconnect", func(t *testing.T) {
t.Parallel()

exporter := tracetest.NewInMemoryExporter(t)
Expand Down Expand Up @@ -74,7 +100,7 @@ func TestClientDisconnectionBehavior(t *testing.T) {
})
})

t.Run("error metrics are not inflated but request count is recorded", func(t *testing.T) {
t.Run("error metrics are not inflated but request count is recorded on client disconnect", func(t *testing.T) {
t.Parallel()

metricReader := sdkmetric.NewManualReader()
Expand Down Expand Up @@ -116,7 +142,7 @@ func TestClientDisconnectionBehavior(t *testing.T) {
})
})

t.Run("log level is info not error", func(t *testing.T) {
t.Run("context canceled is not logged at error level on client disconnect", func(t *testing.T) {
t.Parallel()

testenv.Run(t, &testenv.Config{
Expand Down Expand Up @@ -151,7 +177,208 @@ func TestClientDisconnectionBehavior(t *testing.T) {
})
})

t.Run("other errors still mark span as error", func(t *testing.T) {
t.Run("subgraph fetch span is not marked as error on client disconnect", func(t *testing.T) {
t.Parallel()

exporter := tracetest.NewInMemoryExporter(t)

testenv.Run(t, &testenv.Config{
TraceExporter: exporter,
LogObservation: testenv.LogObservationConfig{
Enabled: true,
LogLevel: zapcore.DebugLevel,
},
Subgraphs: testenv.SubgraphsConfig{
Employees: testenv.SubgraphConfig{
Delay: 2 * time.Second,
},
},
}, func(t *testing.T, xEnv *testenv.Environment) {
// Since the subgraph takes 2 seconds, and this takes 200 milliseconds which is less than the subgraph
// this will ensure that the request is cancelled due to a timeout
ctx, cancel := context.WithTimeout(t.Context(), 200*time.Millisecond)
defer cancel()

req, err := http.NewRequestWithContext(ctx, http.MethodPost, xEnv.GraphQLRequestURL(),
strings.NewReader(`{"query":"{ employees { id } }"}`))
require.NoError(t, err)
req.Header.Set("Content-Type", "application/json")

client := &http.Client{}
resp, err := client.Do(req)
require.Error(t, err)
require.Nil(t, resp, "client should not receive any response when it disconnects")
Comment thread
endigma marked this conversation as resolved.

spans := waitForStableSpans(t, exporter)

// No span in the entire trace should be marked as ERROR for client disconnections
for _, s := range spans {
require.NotEqual(t, codes.Error, s.Status().Code,
"span %q should not be marked as error when client disconnects", s.Name())
}

// Find the "Engine - Fetch" span and verify the cancellation is still recorded as an event
var fetchSpan sdktrace.ReadOnlySpan
for _, s := range spans {
if s.Name() == "Engine - Fetch" {
fetchSpan = s
break
}
}
require.NotNil(t, fetchSpan, "expected Engine - Fetch span to be exported")

hasExceptionEvent := false
for _, event := range fetchSpan.Events() {
if event.Name == "exception" {
hasExceptionEvent = true
break
}
}
require.True(t, hasExceptionEvent,
"subgraph fetch span should have an exception event recorded for client disconnections")

// The wg.request.error attribute should not be set on the fetch span
for _, attr := range fetchSpan.Attributes() {
if attr.Key == "wg.request.error" {
require.False(t, attr.Value.AsBool(),
"wg.request.error should not be true on fetch span for client disconnects")
}
}

// Verify no 500 status code was written — the server should not produce
// an error response when the client has disconnected
requestLogs := xEnv.Observer().FilterField(zap.Int("status", 500)).All()
require.Empty(t, requestLogs,
"server should not write a 500 response for client disconnections")
})
})

t.Run("persisted operation fetch span is not marked as error on client disconnect", func(t *testing.T) {
Comment thread
endigma marked this conversation as resolved.
t.Parallel()

exporter := tracetest.NewInMemoryExporter(t)

// Create a slow CDN server that delays persisted operation responses long enough
// for the client's request context to be canceled mid-fetch.
cdnServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/" {
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`[]`))
return
}
time.Sleep(2 * time.Second)
w.WriteHeader(http.StatusNotFound)
}))
defer cdnServer.Close()

testenv.Run(t, &testenv.Config{
TraceExporter: exporter,
LogObservation: testenv.LogObservationConfig{
Enabled: true,
LogLevel: zapcore.DebugLevel,
},
CdnSever: cdnServer,
ModifyCDNConfig: func(cfg *config.CDNConfiguration) {
cfg.CacheSize = 0 // Disable cache so every request hits the CDN
},
}, func(t *testing.T, xEnv *testenv.Environment) {
ctx, cancel := context.WithTimeout(t.Context(), 200*time.Millisecond)
defer cancel()

req, err := http.NewRequestWithContext(ctx, http.MethodPost, xEnv.GraphQLRequestURL(),
strings.NewReader(`{"operationName":"Employees","extensions":{"persistedQuery":{"version":1,"sha256Hash":"dc67510fb4289672bea757e862d6b00e83db5d3cbbcfb15260601b6f29bb2b8f"}}}`))
require.NoError(t, err)
req.Header.Set("Content-Type", "application/json")
req.Header.Set("graphql-client-name", "my-client")

client := &http.Client{}
resp, err := client.Do(req)
require.Error(t, err)
require.Nil(t, resp, "client should not receive any response when it disconnects")

spans := waitForStableSpans(t, exporter)

// No span should be marked as ERROR for client disconnections
for _, s := range spans {
require.NotEqual(t, codes.Error, s.Status().Code,
"span %q should not be marked as error when client disconnects during persisted op fetch", s.Name())
}

// Verify the "Load Persisted Operation" span exists and has the exception event
var poSpan sdktrace.ReadOnlySpan
for _, s := range spans {
if s.Name() == "Load Persisted Operation" {
poSpan = s
break
}
}
require.NotNil(t, poSpan, "expected Load Persisted Operation span to be exported")

hasExceptionEvent := false
for _, event := range poSpan.Events() {
if event.Name == "exception" {
hasExceptionEvent = true
break
}
}
require.True(t, hasExceptionEvent,
"Load Persisted Operation span should have an exception event for client disconnections")

// Verify no 500 status code was written
requestLogs := xEnv.Observer().FilterField(zap.Int("status", 500)).All()
require.Empty(t, requestLogs,
"server should not write a 500 response for client disconnections during persisted op fetch")
})
})

t.Run("batched request spans are not marked as error on client disconnect", func(t *testing.T) {
t.Parallel()

exporter := tracetest.NewInMemoryExporter(t)

testenv.Run(t, &testenv.Config{
TraceExporter: exporter,
LogObservation: testenv.LogObservationConfig{
Enabled: true,
LogLevel: zapcore.DebugLevel,
},
BatchingConfig: config.BatchingConfig{
Enabled: true,
MaxConcurrency: 10,
MaxEntriesPerBatch: 100,
},
Subgraphs: testenv.SubgraphsConfig{
Employees: testenv.SubgraphConfig{
Delay: 2 * time.Second,
},
},
}, func(t *testing.T, xEnv *testenv.Environment) {
ctx, cancel := context.WithTimeout(t.Context(), 200*time.Millisecond)
defer cancel()

res, err := xEnv.MakeGraphQLBatchedRequestRequestWithContext(ctx, []testenv.GraphQLRequest{
{Query: `query employees { employees { id } }`},
{Query: `query employee { employees { isAvailable } }`},
}, nil)
require.Error(t, err)
require.Nil(t, res, "client should not receive any response when it disconnects")

spans := waitForStableSpans(t, exporter)

// No span should be marked as ERROR for client disconnections
for _, s := range spans {
require.NotEqual(t, codes.Error, s.Status().Code,
"span %q should not be marked as error when client disconnects during batch request", s.Name())
}

// Verify no 500 status code was written
requestLogs := xEnv.Observer().FilterField(zap.Int("status", 500)).All()
require.Empty(t, requestLogs,
"server should not write a 500 response for client disconnections during batch request")
})
})

t.Run("root span is marked as error on subgraph failure", func(t *testing.T) {
t.Parallel()

exporter := tracetest.NewInMemoryExporter(t)
Expand Down
5 changes: 3 additions & 2 deletions router-tests/telemetry/telemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7737,7 +7737,7 @@ func TestFlakyTelemetry(t *testing.T) {
require.Equal(t, "Engine - Fetch", sn[8].Name())
require.Equal(t, trace.SpanKindInternal, sn[8].SpanKind())
require.Equal(t, codes.Error, sn[8].Status().Code)
require.Lenf(t, sn[8].Attributes(), 14, "expected 14 attributes, got %d", len(sn[8].Attributes()))
require.Lenf(t, sn[8].Attributes(), 15, "expected 15 attributes, got %d", len(sn[8].Attributes()))
require.Contains(t, sn[8].Status().Description, "connect: connection refused\nFailed to fetch from Subgraph 'products' at Path: 'employees'.")

events := sn[8].Events()
Expand Down Expand Up @@ -7809,7 +7809,7 @@ func TestFlakyTelemetry(t *testing.T) {
require.Equal(t, "Engine - Fetch", sn[8].Name())
require.Equal(t, trace.SpanKindInternal, sn[8].SpanKind())

require.Lenf(t, sn[8].Attributes(), 14, "expected 14 attributes, got %d", len(sn[6].Attributes()))
require.Lenf(t, sn[8].Attributes(), 15, "expected 15 attributes, got %d", len(sn[8].Attributes()))

given = attribute.NewSet(sn[8].Attributes()...)
want = attribute.NewSet([]attribute.KeyValue{
Expand All @@ -7827,6 +7827,7 @@ func TestFlakyTelemetry(t *testing.T) {
otel.WgOperationType.String("query"),
otel.WgOperationProtocol.String("http"),
otel.WgOperationHash.String("13939103824696605913"),
otel.WgRequestError.Bool(true),
}...)

require.True(t, given.Equals(&want))
Expand Down
24 changes: 17 additions & 7 deletions router-tests/testenv/testenv.go
Original file line number Diff line number Diff line change
Expand Up @@ -1804,23 +1804,33 @@ func SetupCDNServer(t testing.TB) (cdnServer *httptest.Server, port int) {
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/" {
requestLog, err := json.Marshal(cdnRequestLog)
require.NoError(t, err)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
_, err = w.Write(requestLog)
require.NoError(t, err)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
return
}

cdnRequestLog = append(cdnRequestLog, r.Method+" "+r.URL.Path)
// Ensure we have an authorization header with a valid token
authorization := r.Header.Get("Authorization")
if authorization == "" {
require.NotEmpty(t, authorization, "missing authorization header")
token, ok := strings.CutPrefix(authorization, "Bearer ")
if !ok {
http.Error(w, "missing or malformed Bearer token", http.StatusUnauthorized)
return
}
token := authorization[len("Bearer "):]
parsedClaims := make(jwt.MapClaims)
jwtParser := new(jwt.Parser)
_, _, err := jwtParser.ParseUnverified(token, parsedClaims)
require.NoError(t, err)
if _, _, err := jwtParser.ParseUnverified(token, parsedClaims); err != nil {
http.Error(w, err.Error(), http.StatusUnauthorized)
return
}
cdnFileServer.ServeHTTP(w, r)
})
cdnServer = httptest.NewServer(handler)
Expand Down
7 changes: 6 additions & 1 deletion router/core/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,12 @@ func processBatchedRequest(w http.ResponseWriter, r *http.Request, handlerOpts H
}

func processBatchError(w http.ResponseWriter, r *http.Request, err error, requestLogger *zap.Logger) {
ctrace.AttachErrToSpanFromContext(r.Context(), err)
if errors.Is(err, context.Canceled) {
span := trace.SpanFromContext(r.Context())
span.RecordError(err)
} else {
ctrace.AttachErrToSpanFromContext(r.Context(), err)
}

requestError := graphqlerrors.RequestError{
Message: err.Error(),
Expand Down
Loading
Loading