Skip to content
116 changes: 116 additions & 0 deletions router-tests/structured_logging_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"os"
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -2867,6 +2868,121 @@ func TestFlakyAccessLogs(t *testing.T) {
})

t.Run("verify subgraph expressions", func(t *testing.T) {
t.Parallel()

t.Run("verify data source fetch duration value is attached", func(t *testing.T) {
t.Parallel()

testenv.Run(t, &testenv.Config{
SubgraphAccessLogsEnabled: true,
SubgraphAccessLogFields: []config.CustomAttribute{
{
Key: "data_source_fetch_duration",
ValueFrom: &config.CustomDynamicAttribute{
Expression: "subgraph.request.clientTrace.dataSourceFetchDuration",
},
},
},
Comment thread
SkArchon marked this conversation as resolved.
LogObservation: testenv.LogObservationConfig{
Enabled: true,
LogLevel: zapcore.InfoLevel,
},
}, func(t *testing.T, xEnv *testenv.Environment) {
xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{
Query: `query myQuery { employees { id } }`,
})
requestLog := xEnv.Observer().FilterMessage("/graphql")
requestLogAll := requestLog.All()
requestContextMap := requestLogAll[0].ContextMap()

dataSourceFetchDuration, ok := requestContextMap["data_source_fetch_duration"].(time.Duration)
require.True(t, ok)
require.Greater(t, int(dataSourceFetchDuration), 0)
})
})

t.Run("verify data source fetch duration value is attached for multiple subgraph calls", func(t *testing.T) {
t.Parallel()

testenv.Run(t, &testenv.Config{
SubgraphAccessLogsEnabled: true,
SubgraphAccessLogFields: []config.CustomAttribute{
{
Key: "data_source_fetch_duration",
ValueFrom: &config.CustomDynamicAttribute{
Expression: "subgraph.request.clientTrace.dataSourceFetchDuration",
},
},
},
LogObservation: testenv.LogObservationConfig{
Enabled: true,
LogLevel: zapcore.InfoLevel,
},
}, func(t *testing.T, xEnv *testenv.Environment) {
xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{
Query: `query myQuery { employees { id isAvailable } }`,
})
requestLog := xEnv.Observer().FilterMessage("/graphql")
requestLogAll := requestLog.All()

employeeSubgraphLogs := requestLogAll[0]
dataSourceFetchDuration1, ok := employeeSubgraphLogs.ContextMap()["data_source_fetch_duration"].(time.Duration)
require.True(t, ok)
require.Greater(t, int(dataSourceFetchDuration1), 0)

availabilitySubgraphLogs := requestLogAll[1]
dataSourceFetchDuration2, ok := availabilitySubgraphLogs.ContextMap()["data_source_fetch_duration"].(time.Duration)
require.True(t, ok)
require.Greater(t, int(dataSourceFetchDuration2), 0)
})
})

t.Run("verify data source fetch duration in conditional expression", func(t *testing.T) {
t.Parallel()

testenv.Run(t, &testenv.Config{
SubgraphAccessLogsEnabled: true,
SubgraphAccessLogFields: []config.CustomAttribute{
{
Key: "data_source_fetch_duration",
ValueFrom: &config.CustomDynamicAttribute{
Expression: "subgraph.request.error != nil ? subgraph.request.clientTrace.dataSourceFetchDuration : ''",
},
},
},
LogObservation: testenv.LogObservationConfig{
Enabled: true,
LogLevel: zapcore.InfoLevel,
},
Subgraphs: testenv.SubgraphsConfig{
Availability: testenv.SubgraphConfig{
Middleware: func(_ http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusForbidden)
_, _ = w.Write([]byte(`{"errors":[{"message":"Unauthorized","extensions":{"code":"UNAUTHORIZED"}}]}`))
})
},
},
},
}, func(t *testing.T, xEnv *testenv.Environment) {
xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{
Query: `query myQuery { employees { id isAvailable } }`,
})
requestLog := xEnv.Observer().FilterMessage("/graphql")
requestLogAll := requestLog.All()

employeeSubgraphLogs := requestLogAll[0]
_, ok := employeeSubgraphLogs.ContextMap()["data_source_fetch_duration"]
require.False(t, ok)

availabilitySubgraphLogs := requestLogAll[1]
dataSourceFetchDuration2, ok := availabilitySubgraphLogs.ContextMap()["data_source_fetch_duration"].(float64)
require.True(t, ok)
require.Greater(t, int(dataSourceFetchDuration2), 0)
})
Comment thread
SkArchon marked this conversation as resolved.
Outdated
})

t.Run("verify connAcquireDuration value is attached", func(t *testing.T) {
t.Parallel()

Expand Down
54 changes: 53 additions & 1 deletion router-tests/telemetry/telemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9795,7 +9795,7 @@ func TestFlakyTelemetry(t *testing.T) {
{
Key: "custom.subgraph",
ValueFrom: &config.CustomDynamicAttribute{
Expression: "string(subgraph.request.clientTrace.connAcquireDuration)",
Expression: "string(subgraph.request.clientTrace.connAcquireDuration.Seconds())",
},
},
},
Expand Down Expand Up @@ -9854,6 +9854,58 @@ func TestFlakyTelemetry(t *testing.T) {
require.False(t, ok)
})
})

t.Run("verify data source fetch duration value is attached for multiple subgraph calls", func(t *testing.T) {
t.Parallel()

exporter := tracetest.NewInMemoryExporter(t)
metricReader := metric.NewManualReader()
testenv.Run(t, &testenv.Config{
TraceExporter: exporter,
MetricReader: metricReader,
CustomTelemetryAttributes: []config.CustomAttribute{
{
Key: "data_source_fetch_duration.subgraph",
ValueFrom: &config.CustomDynamicAttribute{
Expression: "string(subgraph.request.clientTrace.dataSourceFetchDuration.Seconds())",
},
},
},
}, func(t *testing.T, xEnv *testenv.Environment) {
xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{
Query: `query myQuery { employees { id isAvailable } }`,
})

sn := exporter.GetSpans().Snapshots()
require.Len(t, sn, 11)

var attributesDetected int

for i := 0; i < len(sn); i++ {
attributes := sn[i].Attributes()

if slices.Contains([]string{"Engine - Fetch"}, sn[i].Name()) {
for _, attributeEntry := range attributes {
if attributeEntry.Key == "data_source_fetch_duration.subgraph" {
attributesDetected++
valueString := attributeEntry.Value.AsString()
floatValue, err := strconv.ParseFloat(valueString, 64)
require.NoError(t, err)
require.Greater(t, floatValue, 0.0)
}
}
} else {
for _, attributeEntry := range attributes {
if attributeEntry.Key == "data_source_fetch_duration.subgraph" {
require.Fail(t, "data_source_fetch_duration.subgraph should not be present on non engine fetch spans")
}
}
}
}

require.Equal(t, 2, attributesDetected)
})
})
})

}
Expand Down
10 changes: 10 additions & 0 deletions router/core/engine_loader_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"slices"
"sync/atomic"
"time"

rcontext "github.com/wundergraph/cosmo/router/internal/context"
Expand Down Expand Up @@ -93,6 +94,9 @@ func (f *engineLoaderHooks) OnLoad(ctx context.Context, ds resolve.DataSourceInf

ctx = context.WithValue(ctx, rcontext.CurrentSubgraphContextKey{}, ds.Name)

duration := atomic.Int64{}
ctx = context.WithValue(ctx, rcontext.FetchTimingKey, &duration)

reqContext := getRequestContext(ctx)
if reqContext == nil {
return ctx
Expand Down Expand Up @@ -153,6 +157,12 @@ func (f *engineLoaderHooks) OnFinished(ctx context.Context, ds resolve.DataSourc
exprCtx.Subgraph.Name = ds.Name
exprCtx.Subgraph.Request.Error = WrapExprError(responseInfo.Err)

if value := ctx.Value(rcontext.FetchTimingKey); value != nil {
Comment thread
StarpTech marked this conversation as resolved.
if fetchTiming, ok := value.(*atomic.Int64); ok {
exprCtx.Subgraph.Request.ClientTrace.DataSourceFetchDuration = time.Duration(fetchTiming.Load())
}
}

if f.storeSubgraphResponseBody {
exprCtx.Subgraph.Response.Body.Raw = responseInfo.GetResponseBody()
}
Expand Down
27 changes: 26 additions & 1 deletion router/demo.config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,29 @@ events:
redis:
- id: my-redis
urls:
- "redis://localhost:6379/2"
- "redis://localhost:6379/2"

telemetry:
tracing:
enabled: true
attributes:
- key: conn_duration
value_from:
expression: "string(subgraph.request.clientTrace.connAcquireDuration)"
exporters:
- endpoint: "http://localhost:4319"
exporter: http

access_logs:
subgraphs:
enabled: true
fields:
- key: fetch_duration
value_from:
expression: "subgraph.request.clientTrace.fetchDuration"
Comment thread
SkArchon marked this conversation as resolved.
Outdated
- key: conn_acquire_duration
value_from:
expression: "subgraph.request.clientTrace.connAcquireDuration"
- key: there
value_from:
expression: "subgraph.request.clientTrace.connAcquireDuration.String()"
1 change: 1 addition & 0 deletions router/internal/context/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ const (
RequestContextKey ContextKey = iota
SubgraphResolverContextKey
EngineLoaderHooksContextKey
FetchTimingKey
)
4 changes: 3 additions & 1 deletion router/internal/expr/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"net/http"
"net/url"
"time"

"github.com/expr-lang/expr/file"
"github.com/wundergraph/cosmo/router/pkg/authentication"
Expand Down Expand Up @@ -132,7 +133,8 @@ type SubgraphResponse struct {
}

type ClientTrace struct {
ConnectionAcquireDuration float64 `expr:"connAcquireDuration"`
DataSourceFetchDuration time.Duration `expr:"dataSourceFetchDuration"`
Comment thread
SkArchon marked this conversation as resolved.
Outdated
ConnectionAcquireDuration time.Duration `expr:"connAcquireDuration"`
}

// Subgraph Related
Expand Down
10 changes: 5 additions & 5 deletions router/internal/traceclient/traceclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package traceclient

import (
"context"
rcontext "github.com/wundergraph/cosmo/router/internal/context"
"github.com/wundergraph/cosmo/router/internal/expr"
"net/http"
"net/http/httptrace"
"time"

rcontext "github.com/wundergraph/cosmo/router/internal/context"
"github.com/wundergraph/cosmo/router/internal/expr"

"github.com/wundergraph/cosmo/router/pkg/metric"
rotel "github.com/wundergraph/cosmo/router/pkg/otel"
)
Expand Down Expand Up @@ -113,9 +114,7 @@ func (t *TraceInjectingRoundTripper) processConnectionMetrics(ctx context.Contex

if trace.ConnectionGet != nil && trace.ConnectionAcquired != nil {
duration := trace.ConnectionAcquired.Time.Sub(trace.ConnectionGet.Time)
connAcquireTime := float64(duration) / float64(time.Millisecond)

exprContext.Subgraph.Request.ClientTrace.ConnectionAcquireDuration = connAcquireTime
exprContext.Subgraph.Request.ClientTrace.ConnectionAcquireDuration = duration
Comment thread
StarpTech marked this conversation as resolved.

Comment thread
SkArchon marked this conversation as resolved.
serverAttributes := rotel.GetServerAttributes(trace.ConnectionGet.HostPort)
serverAttributes = append(
Expand All @@ -124,6 +123,7 @@ func (t *TraceInjectingRoundTripper) processConnectionMetrics(ctx context.Contex
rotel.WgSubgraphName.String(subgraph),
)

connAcquireTime := float64(duration) / float64(time.Millisecond)
t.connectionMetricStore.MeasureConnectionAcquireDuration(ctx,
connAcquireTime,
serverAttributes...)
Expand Down
11 changes: 11 additions & 0 deletions router/pkg/trace/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package trace

import (
"net/http"
"sync/atomic"
"time"

"github.com/wundergraph/cosmo/router/internal/context"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
)

Expand Down Expand Up @@ -36,8 +39,16 @@ func (t *transport) RoundTrip(r *http.Request) (*http.Response, error) {
t.handler(r)
}

startTime := time.Now()

res, err := t.rt.RoundTrip(r)

if value := r.Context().Value(context.FetchTimingKey); value != nil {
if fetchTiming, ok := value.(*atomic.Int64); ok {
fetchTiming.Add(int64(time.Since(startTime)))
}
}

// In case of a roundtrip error the span status is set to error by the otelhttp.RoundTrip function.
// Also, status code >= 500 is considered an error

Expand Down
Loading