diff --git a/router-tests/go.mod b/router-tests/go.mod index 43802d65a8..d08571d7fb 100644 --- a/router-tests/go.mod +++ b/router-tests/go.mod @@ -28,7 +28,7 @@ require ( github.com/wundergraph/cosmo/demo/pkg/subgraphs/projects v0.0.0-20250715110703-10f2e5f9c79e github.com/wundergraph/cosmo/router v0.0.0-20260319123623-f186a0f724f6 github.com/wundergraph/cosmo/router-plugin v0.0.0-20250808194725-de123ba1c65e - github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.267 + github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.267.0.20260403145725-997c9e210e8d go.opentelemetry.io/otel v1.39.0 go.opentelemetry.io/otel/sdk v1.39.0 go.opentelemetry.io/otel/sdk/metric v1.39.0 diff --git a/router-tests/go.sum b/router-tests/go.sum index 9bb30f878f..b297d72db2 100644 --- a/router-tests/go.sum +++ b/router-tests/go.sum @@ -357,8 +357,8 @@ github.com/wundergraph/astjson v1.1.0 h1:xORDosrZ87zQFJwNGe/HIHXqzpdHOFmqWgykCLV github.com/wundergraph/astjson v1.1.0/go.mod h1:h12D/dxxnedtLzsKyBLK7/Oe4TAoGpRVC9nDpDrZSWw= github.com/wundergraph/go-arena v1.1.0 h1:9+wSRkJAkA2vbYHp6s8tEGhPViRGQNGXqPHT0QzhdIc= github.com/wundergraph/go-arena v1.1.0/go.mod h1:ROOysEHWJjLQ8FSfNxZCziagb7Qw2nXY3/vgKRh7eWw= -github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.267 h1:qMkYR0oq0Cw61aDZs9VsCCVwNVSxRxT13ytz6WqCwJg= -github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.267/go.mod h1:HjTAO/cuICpu31IfHY9qmSPygx6Gza7Wt9hTSReTI+A= +github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.267.0.20260403145725-997c9e210e8d h1:tL7I0C+wcDhGVT5TMHShZ4LLdLf56rZEGHylf/dq0RA= +github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.267.0.20260403145725-997c9e210e8d/go.mod h1:HjTAO/cuICpu31IfHY9qmSPygx6Gza7Wt9hTSReTI+A= github.com/xrash/smetrics v0.0.0-20250705151800-55b8f293f342 h1:FnBeRrxr7OU4VvAzt5X7s6266i6cSVkkFPS0TuXWbIg= github.com/xrash/smetrics v0.0.0-20250705151800-55b8f293f342/go.mod h1:Ohn+xnUBiLI6FVj/9LpzZWtj1/D6lUovWYBkxHVV3aM= github.com/yosida95/uritemplate/v3 v3.0.2 h1:Ed3Oyj9yrmi9087+NczuL5BwkIc4wvTb5zIM+UJPGz4= diff --git a/router-tests/observability/prometheus_parallel_subgraph_metrics_test.go b/router-tests/observability/prometheus_parallel_subgraph_metrics_test.go new file mode 100644 index 0000000000..84735ec579 --- /dev/null +++ b/router-tests/observability/prometheus_parallel_subgraph_metrics_test.go @@ -0,0 +1,133 @@ +package integration + +import ( + "context" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + io_prometheus_client "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/require" + "github.com/wundergraph/cosmo/router-tests/testenv" + "go.opentelemetry.io/otel/sdk/metric" + "google.golang.org/grpc" +) + +func requireSubgraphDurationHistogram( + t *testing.T, + metricFamilies []*io_prometheus_client.MetricFamily, + subgraphName string, +) *io_prometheus_client.Histogram { + t.Helper() + + requestDuration := findMetricFamilyByName(metricFamilies, "router_http_request_duration_milliseconds") + require.NotNil(t, requestDuration) + + durationMetrics := findMetricsByLabel(requestDuration, "wg_subgraph_name", subgraphName) + require.Len(t, durationMetrics, 1) + + histogram := durationMetrics[0].GetHistogram() + require.NotNil(t, histogram) + require.EqualValues(t, 1, histogram.GetSampleCount()) + + return histogram +} + +func TestPrometheusParallelSubgraphRequestDurationMetrics(t *testing.T) { + t.Parallel() + + const productsDelay = 1200 * time.Millisecond + + metricReader := metric.NewManualReader() + promRegistry := prometheus.NewRegistry() + + testenv.Run(t, &testenv.Config{ + MetricReader: metricReader, + PrometheusRegistry: promRegistry, + Subgraphs: testenv.SubgraphsConfig{ + Products: testenv.SubgraphConfig{ + Delay: productsDelay, + }, + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ + Query: `query TestParallelSubgraphDurations { + employee(id: 1) { + id + } + productTypes { + ... on Consultancy { + name + } + ... on Cosmo { + name + } + } + }`, + }) + + require.Contains(t, res.Body, `"employee":{"id":1}`) + require.Contains(t, res.Body, `"productTypes"`) + + metricFamilies, err := promRegistry.Gather() + require.NoError(t, err) + + employeesDurationMs := requireSubgraphDurationHistogram(t, metricFamilies, "employees").GetSampleSum() + productsDurationMs := requireSubgraphDurationHistogram(t, metricFamilies, "products").GetSampleSum() + + require.GreaterOrEqual(t, productsDurationMs, float64(productsDelay.Milliseconds())) + require.Less(t, employeesDurationMs, float64(productsDelay.Milliseconds())/2) + }) +} + +func TestPrometheusParallelHTTPAndGRPCSubgraphRequestDurationMetrics(t *testing.T) { + t.Parallel() + + const projectsDelay = 900 * time.Millisecond + + metricReader := metric.NewManualReader() + promRegistry := prometheus.NewRegistry() + + testenv.Run(t, &testenv.Config{ + RouterConfigJSONTemplate: testenv.ConfigWithGRPCJSONTemplate, + EnableGRPC: true, + MetricReader: metricReader, + PrometheusRegistry: promRegistry, + Subgraphs: testenv.SubgraphsConfig{ + Projects: testenv.SubgraphConfig{ + GRPCInterceptor: func( + ctx context.Context, + req any, + _ *grpc.UnaryServerInfo, + handler grpc.UnaryHandler, + ) (any, error) { + time.Sleep(projectsDelay) + return handler(ctx, req) + }, + }, + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ + Query: `query TestParallelHTTPAndGRPCSubgraphDurations { + employees { + id + } + projects { + id + } + }`, + }) + + require.Contains(t, res.Body, `"employees":[`) + require.Contains(t, res.Body, `"projects":[`) + + metricFamilies, err := promRegistry.Gather() + require.NoError(t, err) + + employeesDurationMs := requireSubgraphDurationHistogram(t, metricFamilies, "employees").GetSampleSum() + projectsDurationMs := requireSubgraphDurationHistogram(t, metricFamilies, "projects").GetSampleSum() + + require.GreaterOrEqual(t, projectsDurationMs, float64(projectsDelay.Milliseconds())) + require.Less(t, employeesDurationMs, float64(projectsDelay.Milliseconds())/2) + }) +} diff --git a/router-tests/observability/structured_logging_grpc_subgraph_latency_test.go b/router-tests/observability/structured_logging_grpc_subgraph_latency_test.go new file mode 100644 index 0000000000..793bd2fab8 --- /dev/null +++ b/router-tests/observability/structured_logging_grpc_subgraph_latency_test.go @@ -0,0 +1,88 @@ +package integration + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/wundergraph/cosmo/router-tests/testenv" + "go.uber.org/zap/zapcore" + "go.uber.org/zap/zaptest/observer" + "google.golang.org/grpc" +) + +func requireSubgraphLogContext( + t *testing.T, + entries []observer.LoggedEntry, + subgraphName string, +) map[string]interface{} { + t.Helper() + + for _, entry := range entries { + contextMap := entry.ContextMap() + if contextMap["log_type"] == "client/subgraph" && contextMap["subgraph_name"] == subgraphName { + return contextMap + } + } + + t.Fatalf("subgraph log for %q not found", subgraphName) + return nil +} + +func TestStructuredLoggingGRPCSubgraphLatency(t *testing.T) { + t.Parallel() + + const projectsDelay = 900 * time.Millisecond + + testenv.Run(t, &testenv.Config{ + RouterConfigJSONTemplate: testenv.ConfigWithGRPCJSONTemplate, + EnableGRPC: true, + SubgraphAccessLogsEnabled: true, + LogObservation: testenv.LogObservationConfig{ + Enabled: true, + LogLevel: zapcore.InfoLevel, + }, + Subgraphs: testenv.SubgraphsConfig{ + Projects: testenv.SubgraphConfig{ + GRPCInterceptor: func( + ctx context.Context, + req any, + _ *grpc.UnaryServerInfo, + handler grpc.UnaryHandler, + ) (any, error) { + time.Sleep(projectsDelay) + return handler(ctx, req) + }, + }, + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ + Query: `query TestStructuredLoggingGRPCSubgraphLatency { + employees { + id + } + projects { + id + } + }`, + }) + + require.Equal(t, res.Response.StatusCode, 200) + + logEntries := xEnv.Observer().All() + employeeContext := requireSubgraphLogContext(t, logEntries, "employees") + projectContext := requireSubgraphLogContext(t, logEntries, "projects") + + employeeLatency, ok := employeeContext["latency"].(time.Duration) + require.True(t, ok) + require.Greater(t, employeeLatency, time.Duration(0)) + + projectLatency, ok := projectContext["latency"].(time.Duration) + require.True(t, ok) + require.Greater(t, projectLatency, time.Duration(0)) + + // They should be different + require.NotEqual(t, projectLatency, employeeLatency) + }) +} diff --git a/router/core/engine_loader_hooks.go b/router/core/engine_loader_hooks.go index 1d284f06ed..6f5b2d1dfe 100644 --- a/router/core/engine_loader_hooks.go +++ b/router/core/engine_loader_hooks.go @@ -52,10 +52,6 @@ type engineLoaderHooks struct { headerPropagation *HeaderPropagation } -type engineLoaderHooksRequestContext struct { - startTime time.Time -} - func NewEngineRequestHooks( metricStore metric.Store, logger *requestlogger.SubgraphAccessLogger, @@ -97,8 +93,6 @@ func (f *engineLoaderHooks) OnLoad(ctx context.Context, ds resolve.DataSourceInf return ctx } - start := time.Now() - ctx = context.WithValue(ctx, rcontext.CurrentSubgraphContextKey{}, ds.Name) duration := atomic.Int64{} @@ -116,9 +110,7 @@ func (f *engineLoaderHooks) OnLoad(ctx context.Context, ds resolve.DataSourceInf }...), ) - return context.WithValue(ctx, rcontext.EngineLoaderHooksContextKey, &engineLoaderHooksRequestContext{ - startTime: start, - }) + return ctx } func (f *engineLoaderHooks) OnFinished(ctx context.Context, ds resolve.DataSourceInfo, responseInfo *resolve.ResponseInfo) { @@ -148,13 +140,6 @@ func (f *engineLoaderHooks) OnFinished(ctx context.Context, ds resolve.DataSourc return } - hookCtx, ok := ctx.Value(rcontext.EngineLoaderHooksContextKey).(*engineLoaderHooksRequestContext) - if !ok { - return - } - - latency := time.Since(hookCtx.startTime) - span := trace.SpanFromContext(ctx) defer span.End() @@ -175,11 +160,14 @@ func (f *engineLoaderHooks) OnFinished(ctx context.Context, ds resolve.DataSourc exprCtx.Subgraph.Name = ds.Name exprCtx.Subgraph.Request.Error = WrapExprError(responseInfo.Err) + var fetchLatency time.Duration if value := ctx.Value(rcontext.FetchTimingKey); value != nil { if fetchTiming, ok := value.(*atomic.Int64); ok { - exprCtx.Subgraph.Request.ClientTrace.FetchDuration = time.Duration(fetchTiming.Load()) + fetchLatency = time.Duration(fetchTiming.Load()) + exprCtx.Subgraph.Request.ClientTrace.FetchDuration = fetchLatency } } + loadLatency := responseInfo.LoadDuration if f.storeSubgraphResponseBody { exprCtx.Subgraph.Response.Body.Raw = responseInfo.GetResponseBody() @@ -228,7 +216,7 @@ func (f *engineLoaderHooks) OnFinished(ctx context.Context, ds resolve.DataSourc zap.String("subgraph_name", ds.Name), zap.String("subgraph_id", ds.ID), zap.Int("status", responseInfo.StatusCode), - zap.Duration("latency", latency), + zap.Duration("latency", loadLatency), } path := ds.Name if responseInfo.Request != nil { @@ -299,10 +287,10 @@ func (f *engineLoaderHooks) OnFinished(ctx context.Context, ds resolve.DataSourc attrOpt := otelmetric.WithAttributeSet(attribute.NewSet(metricAttrs...)) f.metricStore.MeasureRequestCount(ctx, metricSliceAttrs, attrOpt) - f.metricStore.MeasureLatency(ctx, latency, metricSliceAttrs, attrOpt) + f.metricStore.MeasureLatency(ctx, loadLatency, metricSliceAttrs, attrOpt) } else { f.metricStore.MeasureRequestCount(ctx, reqContext.telemetry.metricSliceAttrs, metricAddOpt) - f.metricStore.MeasureLatency(ctx, latency, reqContext.telemetry.metricSliceAttrs, metricAddOpt) + f.metricStore.MeasureLatency(ctx, loadLatency, reqContext.telemetry.metricSliceAttrs, metricAddOpt) } span.SetAttributes(traceAttrs...) diff --git a/router/go.mod b/router/go.mod index a2b565e403..e81c5e2f8a 100644 --- a/router/go.mod +++ b/router/go.mod @@ -31,7 +31,7 @@ require ( github.com/tidwall/gjson v1.18.0 github.com/tidwall/sjson v1.2.5 github.com/twmb/franz-go v1.16.1 - github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.267 + github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.267.0.20260403145725-997c9e210e8d // Do not upgrade, it renames attributes we rely on go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.58.0 go.opentelemetry.io/contrib/propagators/b3 v1.23.0 diff --git a/router/go.sum b/router/go.sum index 3af92643bc..51b576ba7e 100644 --- a/router/go.sum +++ b/router/go.sum @@ -329,8 +329,8 @@ github.com/wundergraph/astjson v1.1.0 h1:xORDosrZ87zQFJwNGe/HIHXqzpdHOFmqWgykCLV github.com/wundergraph/astjson v1.1.0/go.mod h1:h12D/dxxnedtLzsKyBLK7/Oe4TAoGpRVC9nDpDrZSWw= github.com/wundergraph/go-arena v1.1.0 h1:9+wSRkJAkA2vbYHp6s8tEGhPViRGQNGXqPHT0QzhdIc= github.com/wundergraph/go-arena v1.1.0/go.mod h1:ROOysEHWJjLQ8FSfNxZCziagb7Qw2nXY3/vgKRh7eWw= -github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.267 h1:qMkYR0oq0Cw61aDZs9VsCCVwNVSxRxT13ytz6WqCwJg= -github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.267/go.mod h1:HjTAO/cuICpu31IfHY9qmSPygx6Gza7Wt9hTSReTI+A= +github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.267.0.20260403145725-997c9e210e8d h1:tL7I0C+wcDhGVT5TMHShZ4LLdLf56rZEGHylf/dq0RA= +github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.267.0.20260403145725-997c9e210e8d/go.mod h1:HjTAO/cuICpu31IfHY9qmSPygx6Gza7Wt9hTSReTI+A= github.com/yosida95/uritemplate/v3 v3.0.2 h1:Ed3Oyj9yrmi9087+NczuL5BwkIc4wvTb5zIM+UJPGz4= github.com/yosida95/uritemplate/v3 v3.0.2/go.mod h1:ILOh0sOhIJR3+L/8afwt/kE++YT040gmv5BQTMR2HP4= github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M=