Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 82 additions & 19 deletions router-tests/timeout_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package integration

import (
"net/http"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/wundergraph/cosmo/router-tests/testenv"
"github.com/wundergraph/cosmo/router/core"
"github.com/wundergraph/cosmo/router/pkg/config"
"net/http"
"testing"
"time"
)

func TestTimeouts(t *testing.T) {
Expand All @@ -30,36 +32,73 @@ func TestTimeouts(t *testing.T) {
}
}`

t.Run("applies RequestTimeout", func(t *testing.T) {
t.Run("Per subgraph timeouts", func(t *testing.T) {
t.Parallel()

hobbySubgraphSleep := testenv.SubgraphsConfig{
Hobbies: testenv.SubgraphConfig{
Middleware: func(handler http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
time.Sleep(5 * time.Millisecond) // Slow response
w.Write([]byte("Hello, world!"))
})
subgraphSleep := func(hobbies, employees time.Duration) testenv.SubgraphsConfig {
return testenv.SubgraphsConfig{
Hobbies: testenv.SubgraphConfig{
Middleware: func(handler http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(hobbies)
handler.ServeHTTP(w, r)
})
},
},
},
Employees: testenv.SubgraphConfig{
Middleware: func(handler http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(employees) // Slow response
handler.ServeHTTP(w, r)
})
},
},
}
}

trafficConfig := config.TrafficShapingRules{
All: config.GlobalSubgraphRequestRule{
RequestTimeout: 500 * time.Millisecond,
RequestTimeout: 200 * time.Millisecond,
},
Subgraphs: map[string]*config.GlobalSubgraphRequestRule{
"hobbies": {
RequestTimeout: 3 * time.Millisecond,
RequestTimeout: 300 * time.Millisecond,
},
},
}
t.Run("applied subgraph timeout to request", func(t *testing.T) {

t.Run("no timeout on hobbies subgraph", func(t *testing.T) {
t.Parallel()

hobbiesDelay := 200 * time.Millisecond // 200ms is lower than the hobbies 300ms timeout
employeesDelay := 100 * time.Millisecond // 100ms is lower than the global 200ms timeout

testenv.Run(t, &testenv.Config{
Subgraphs: subgraphSleep(hobbiesDelay, employeesDelay),
RouterOptions: []core.Option{
core.WithSubgraphTransportOptions(
core.NewSubgraphTransportOptions(trafficConfig)),
},
}, func(t *testing.T, xEnv *testenv.Environment) {
res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{
Query: queryEmployeeWithHobby,
})

// It can also result in invalid JSON, but we don't care about that here
require.NotContains(t, res.Body, "Failed to fetch from Subgraph 'hobbies'")

require.Equal(t, `{"data":{"employee":{"id":1,"hobbies":[{},{"name":"Counter Strike"},{},{},{}]}}}`, res.Body)
})
})

t.Run("timeout on hobbies request", func(t *testing.T) {
t.Parallel()

hobbiesDelay := 500 * time.Millisecond // 500 is bigger than hobbies 300ms timeout
employeesDelay := 100 * time.Millisecond // 100ms is lower than the global 200ms timeout

testenv.Run(t, &testenv.Config{
Subgraphs: hobbySubgraphSleep,
Subgraphs: subgraphSleep(hobbiesDelay, employeesDelay),
RouterOptions: []core.Option{
core.WithSubgraphTransportOptions(
core.NewSubgraphTransportOptions(trafficConfig)),
Expand All @@ -74,11 +113,14 @@ func TestTimeouts(t *testing.T) {
})
})

t.Run("Subgraph timeout options don't affect unrelated subgraph", func(t *testing.T) {
t.Run("no timeout on employees subgraph", func(t *testing.T) {
t.Parallel()

hobbiesDelay := 500 * time.Millisecond // hobbies delay doesn't matter in this test case
employeesDelay := 100 * time.Millisecond // 100ms is lower than the global 200ms timeout

testenv.Run(t, &testenv.Config{
Subgraphs: hobbySubgraphSleep,
Subgraphs: subgraphSleep(hobbiesDelay, employeesDelay),
RouterOptions: []core.Option{
core.WithSubgraphTransportOptions(
core.NewSubgraphTransportOptions(trafficConfig)),
Expand All @@ -90,6 +132,27 @@ func TestTimeouts(t *testing.T) {
require.Equal(t, `{"data":{"employee":{"id":1}}}`, res.Body)
})
})

t.Run("timeout on employees subgraph", func(t *testing.T) {
t.Parallel()

hobbiesDelay := 500 * time.Millisecond // 500 is bigger than hobbies 300ms timeout
employeesDelay := 300 * time.Millisecond // 300ms is bigger than the global 200ms timeout

testenv.Run(t, &testenv.Config{
Subgraphs: subgraphSleep(hobbiesDelay, employeesDelay),
RouterOptions: []core.Option{
core.WithSubgraphTransportOptions(
core.NewSubgraphTransportOptions(trafficConfig)),
},
}, func(t *testing.T, xEnv *testenv.Environment) {
res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{
Query: queryEmployeeWithNoHobby,
})

require.Contains(t, res.Body, "Failed to fetch from Subgraph 'employees'")
})
})
})

t.Run("ResponseHeaderTimeout exceeded", func(t *testing.T) {
Expand Down
3 changes: 0 additions & 3 deletions router/core/factoryresolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"net/http"
"net/url"
"slices"
"time"

"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/argument_templates"

Expand Down Expand Up @@ -41,7 +40,6 @@ type FactoryResolver interface {

type ApiTransportFactory interface {
RoundTripper(enableSingleFlight bool, transport http.RoundTripper) http.RoundTripper
DefaultTransportTimeout() time.Duration
DefaultHTTPProxyURL() *url.URL
}

Expand Down Expand Up @@ -73,7 +71,6 @@ func NewDefaultFactoryResolver(
) *DefaultFactoryResolver {

defaultHttpClient := &http.Client{
Timeout: transportFactory.DefaultTransportTimeout(),
Comment thread
devsergiy marked this conversation as resolved.
Transport: transportFactory.RoundTripper(enableSingleFlight, baseTransport),
}
streamingClient := &http.Client{
Expand Down
13 changes: 12 additions & 1 deletion router/core/timeout_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ package core

import (
"context"
"go.uber.org/zap"
"net/http"

"go.uber.org/zap"
)

type TimeoutTransport struct {
Expand Down Expand Up @@ -40,14 +41,24 @@ func (tt *TimeoutTransport) RoundTrip(req *http.Request) (*http.Response, error)
return nil, nil
}
subgraph := rq.ActiveSubgraph(req)

if subgraph != nil && subgraph.Name != "" && tt.subgraphTrippers[subgraph.Name] != nil {
timeout := tt.opts.SubgraphMap[subgraph.Name].RequestTimeout
if timeout > 0 {
ctx, cancel := context.WithTimeout(req.Context(), timeout)
defer cancel()

return tt.subgraphTrippers[subgraph.Name].RoundTrip(req.WithContext(ctx))
}
return tt.subgraphTrippers[subgraph.Name].RoundTrip(req)
}

if tt.opts.RequestTimeout > 0 {
ctx, cancel := context.WithTimeout(req.Context(), tt.opts.RequestTimeout)
defer cancel()

return tt.defaultTransport.RoundTrip(req.WithContext(ctx))
}

return tt.defaultTransport.RoundTrip(req)
}
5 changes: 3 additions & 2 deletions router/core/timeout_transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package core
import (
"context"
"crypto/tls"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"net/http"
"net/http/httptest"
"os"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

func TestTimeoutTransport(t *testing.T) {
Expand Down
21 changes: 6 additions & 15 deletions router/core/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,37 +3,35 @@ package core
import (
"bytes"
"fmt"
"go.opentelemetry.io/otel/propagation"
"io"
"net/http"
"net/url"
"sort"
"strconv"
"sync"
"time"

"go.opentelemetry.io/otel/propagation"

otelmetric "go.opentelemetry.io/otel/metric"

"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/datasource/httpclient"
"go.opentelemetry.io/otel/attribute"
sdktrace "go.opentelemetry.io/otel/sdk/trace"

semconv "go.opentelemetry.io/otel/semconv/v1.21.0"

"github.com/wundergraph/cosmo/router/pkg/metric"
"github.com/wundergraph/cosmo/router/pkg/otel"
"github.com/wundergraph/cosmo/router/pkg/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"

"github.com/wundergraph/cosmo/router/internal/docker"
"github.com/wundergraph/cosmo/router/internal/retrytransport"
"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve"
"github.com/wundergraph/graphql-go-tools/v2/pkg/pool"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
otrace "go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)

var (
defaultTimeout = 60 * time.Second
"github.com/wundergraph/cosmo/router/internal/docker"
"github.com/wundergraph/cosmo/router/internal/retrytransport"
)

type TransportPreHandler func(req *http.Request, ctx RequestContext) (*http.Request, *http.Response)
Expand Down Expand Up @@ -397,13 +395,6 @@ func (t TransportFactory) RoundTripper(enableSingleFlight bool, baseTransport ht
return tp
}

func (t TransportFactory) DefaultTransportTimeout() time.Duration {
if t.subgraphTransportOptions != nil {
return t.subgraphTransportOptions.RequestTimeout
}
return defaultTimeout
}

func (t TransportFactory) DefaultHTTPProxyURL() *url.URL {
return nil
}
Expand Down