Skip to content
Closed
4 changes: 2 additions & 2 deletions router-tests/error_handling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ func TestAllowedExtensions(t *testing.T) {
res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{
Query: `{ employees { id } }`,
})
require.Equal(t, `{"errors":[{"message":"Unauthorized","extensions":{"code":"UNAUTHORIZED","reason":"expired","details":"token expired","statusCode":403}}],"data":{"employees":null}}`, res.Body)
require.JSONEq(t, `{"errors":[{"message":"Unauthorized","extensions":{"code":"UNAUTHORIZED","reason":"expired","details":"token expired","statusCode":403}}],"data":{"employees":null}}`, res.Body)
})
})

Expand All @@ -436,7 +436,7 @@ func TestAllowedExtensions(t *testing.T) {
res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{
Query: `{ employees { id } }`,
})
require.Equal(t, `{"errors":[{"message":"Unauthorized","extensions":{"code":"UNAUTHORIZED","reason":"expired","details":"token expired","statusCode":403}}],"data":{"employees":null}}`, res.Body)
require.JSONEq(t, `{"errors":[{"message":"Unauthorized","extensions":{"code":"UNAUTHORIZED","reason":"expired","details":"token expired","statusCode":403}}],"data":{"employees":null}}`, res.Body)
})
})

Expand Down
75 changes: 75 additions & 0 deletions router-tests/prometheus_parallel_subgraph_metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package integration

import (
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"github.com/wundergraph/cosmo/router-tests/testenv"
"go.opentelemetry.io/otel/sdk/metric"
)

func TestPrometheusParallelSubgraphRequestDurationMetrics(t *testing.T) {
t.Parallel()

const productsDelay = 1200 * time.Millisecond

metricReader := metric.NewManualReader()
promRegistry := prometheus.NewRegistry()

testenv.Run(t, &testenv.Config{
MetricReader: metricReader,
PrometheusRegistry: promRegistry,
Subgraphs: testenv.SubgraphsConfig{
Products: testenv.SubgraphConfig{
Delay: productsDelay,
},
},
}, func(t *testing.T, xEnv *testenv.Environment) {
res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{
Query: `query TestParallel2 {
employee(id: 1) {
id
}
productTypes {
... on Consultancy {
name
}
... on Cosmo {
name
}
}
}`,
})

require.Contains(t, res.Body, `"employee":{"id":1}`)
require.Contains(t, res.Body, `"productTypes"`)

metricFamily, err := promRegistry.Gather()
require.NoError(t, err)

requestDuration := findMetricFamilyByName(metricFamily, "router_http_request_duration_milliseconds")
require.NotNil(t, requestDuration)

employeesDurationMetrics := findMetricsByLabel(requestDuration, "wg_subgraph_name", "employees")
require.Len(t, employeesDurationMetrics, 1)

productsDurationMetrics := findMetricsByLabel(requestDuration, "wg_subgraph_name", "products")
require.Len(t, productsDurationMetrics, 1)

employeesHistogram := employeesDurationMetrics[0].GetHistogram()
productsHistogram := productsDurationMetrics[0].GetHistogram()
require.NotNil(t, employeesHistogram)
require.NotNil(t, productsHistogram)
require.EqualValues(t, 1, employeesHistogram.GetSampleCount())
require.EqualValues(t, 1, productsHistogram.GetSampleCount())

employeesDurationMs := employeesHistogram.GetSampleSum()
productsDurationMs := productsHistogram.GetSampleSum()

require.Greater(t, productsDurationMs, float64(productsDelay.Milliseconds()-250))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a source for flaky tests.

require.Less(t, employeesDurationMs, float64(productsDelay.Milliseconds()/2))
require.Greater(t, productsDurationMs-employeesDurationMs, 400.0)
})
}
15 changes: 10 additions & 5 deletions router/core/engine_loader_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ func (f *engineLoaderHooks) OnFinished(ctx context.Context, ds resolve.DataSourc
}

latency := time.Since(hookCtx.startTime)

span := trace.SpanFromContext(ctx)
defer span.End()

Expand All @@ -175,11 +174,17 @@ func (f *engineLoaderHooks) OnFinished(ctx context.Context, ds resolve.DataSourc
exprCtx.Subgraph.Name = ds.Name
exprCtx.Subgraph.Request.Error = WrapExprError(responseInfo.Err)

var subgraphFetchLatency time.Duration
if value := ctx.Value(rcontext.FetchTimingKey); value != nil {
if fetchTiming, ok := value.(*atomic.Int64); ok {
exprCtx.Subgraph.Request.ClientTrace.FetchDuration = time.Duration(fetchTiming.Load())
subgraphFetchLatency = time.Duration(fetchTiming.Load())
exprCtx.Subgraph.Request.ClientTrace.FetchDuration = subgraphFetchLatency
}
}
// If there is no fetch timing available, use the total latency as the subgraph fetch latency
if subgraphFetchLatency == 0 {
subgraphFetchLatency = latency
}
Comment thread
alepane21 marked this conversation as resolved.

if f.storeSubgraphResponseBody {
exprCtx.Subgraph.Response.Body.Raw = responseInfo.GetResponseBody()
Expand Down Expand Up @@ -228,7 +233,7 @@ func (f *engineLoaderHooks) OnFinished(ctx context.Context, ds resolve.DataSourc
zap.String("subgraph_name", ds.Name),
zap.String("subgraph_id", ds.ID),
zap.Int("status", responseInfo.StatusCode),
zap.Duration("latency", latency),
zap.Duration("latency", subgraphFetchLatency),
}
path := ds.Name
if responseInfo.Request != nil {
Expand Down Expand Up @@ -301,10 +306,10 @@ func (f *engineLoaderHooks) OnFinished(ctx context.Context, ds resolve.DataSourc

attrOpt := otelmetric.WithAttributeSet(attribute.NewSet(metricAttrs...))
f.metricStore.MeasureRequestCount(ctx, metricSliceAttrs, attrOpt)
f.metricStore.MeasureLatency(ctx, latency, metricSliceAttrs, attrOpt)
f.metricStore.MeasureLatency(ctx, subgraphFetchLatency, metricSliceAttrs, attrOpt)
} else {
f.metricStore.MeasureRequestCount(ctx, reqContext.telemetry.metricSliceAttrs, metricAddOpt)
f.metricStore.MeasureLatency(ctx, latency, reqContext.telemetry.metricSliceAttrs, metricAddOpt)
f.metricStore.MeasureLatency(ctx, subgraphFetchLatency, reqContext.telemetry.metricSliceAttrs, metricAddOpt)
}

span.SetAttributes(traceAttrs...)
Expand Down
Loading