Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 153 additions & 0 deletions router-tests/lifecycle/new_server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package integration

import (
"bytes"
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/wundergraph/cosmo/demo/pkg/subgraphs"
"github.com/wundergraph/cosmo/router-tests/testenv"
"github.com/wundergraph/cosmo/router/core"
nodev1 "github.com/wundergraph/cosmo/router/gen/proto/wg/cosmo/node/v1"
"github.com/wundergraph/cosmo/router/pkg/config"
natsPubsub "github.com/wundergraph/cosmo/router/pkg/pubsub/nats"
"go.uber.org/zap"
"google.golang.org/protobuf/encoding/protojson"
)

func TestNewServer(t *testing.T) {
t.Parallel()

t.Run("creates a working server without starting a listener", func(t *testing.T) {
t.Parallel()

rr, cleanup := setupNewServerTest(t)
defer cleanup()

svr, err := rr.NewServer(t.Context())
require.NoError(t, err)

svr.HealthChecks().SetReady(true)
ts := httptest.NewServer(svr.HttpServer().Handler)
defer ts.Close()

data, err := json.Marshal(testenv.GraphQLRequest{Query: `query { employees { id } }`})
require.NoError(t, err)

req, err := http.NewRequestWithContext(t.Context(), http.MethodPost, ts.URL+"/graphql", bytes.NewReader(data))
require.NoError(t, err)

response, err := testenv.MakeGraphQLRequestRawFromClient(req, &http.Client{})
require.NoError(t, err)

require.Equal(t, http.StatusOK, response.Response.StatusCode)
require.JSONEq(t, `{"data":{"employees":[{"id":1},{"id":2},{"id":3},{"id":4},{"id":5},{"id":7},{"id":8},{"id":10},{"id":11},{"id":12}]}}`, response.Body)
})

t.Run("health check endpoint is reachable", func(t *testing.T) {
t.Parallel()

rr, cleanup := setupNewServerTest(t)
defer cleanup()

svr, err := rr.NewServer(t.Context())
require.NoError(t, err)

svr.HealthChecks().SetReady(true)
ts := httptest.NewServer(svr.HttpServer().Handler)
defer ts.Close()

resp, err := http.Get(ts.URL + "/health/ready")
require.NoError(t, err)
defer func() {
_ = resp.Body.Close()
}()
require.Equal(t, http.StatusOK, resp.StatusCode)
})

t.Run("new server shutdown prevents further requests", func(t *testing.T) {
t.Parallel()

rr, cleanup := setupNewServerTest(t)
defer cleanup()

svr, err := rr.NewServer(t.Context())
require.NoError(t, err)

svr.HealthChecks().SetReady(true)
ts := httptest.NewServer(svr.HttpServer().Handler)

// Verify the server works before shutdown
data, err := json.Marshal(testenv.GraphQLRequest{Query: `query { employees { id } }`})
require.NoError(t, err)

req, err := http.NewRequestWithContext(t.Context(), http.MethodPost, ts.URL+"/graphql", bytes.NewReader(data))
require.NoError(t, err)

response1, err := testenv.MakeGraphQLRequestRawFromClient(req, &http.Client{})
require.NoError(t, err)

require.Equal(t, http.StatusOK, response1.Response.StatusCode)
require.JSONEq(t, `{"data":{"employees":[{"id":1},{"id":2},{"id":3},{"id":4},{"id":5},{"id":7},{"id":8},{"id":10},{"id":11},{"id":12}]}}`, response1.Body)

// Shutdown the router, then close the httptest server
shutdownCtx, shutdownCancel := context.WithTimeout(t.Context(), 5*time.Second)
defer shutdownCancel()
require.NoError(t, rr.Shutdown(shutdownCtx))
ts.Close()

// After shutdown, requests should fail
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
req, err = http.NewRequestWithContext(ctx, http.MethodPost, ts.URL+"/graphql", bytes.NewReader(data))
require.NoError(t, err)

_, err = testenv.MakeGraphQLRequestRawFromClient(req, &http.Client{})
require.Error(t, err)
})
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

func setupNewServerTest(t *testing.T) (rr *core.Router, cleanup func()) {
t.Helper()

employeesServer := httptest.NewServer(subgraphs.EmployeesHandler(&subgraphs.SubgraphOptions{
NatsPubSubByProviderID: map[string]natsPubsub.Adapter{},
}))

// Build the router config from the embedded template
replaced := testenv.ConfigJSONTemplate
replacements := map[string]string{
subgraphs.EmployeesDefaultDemoURL: testenv.GqlURL(employeesServer),
}
for k, v := range replacements {
replaced = strings.ReplaceAll(replaced, k, v)
}

var routerConfig nodev1.RouterConfig
require.NoError(t, protojson.Unmarshal([]byte(replaced), &routerConfig))

rr, err := core.NewRouter(
core.WithDisableUsageTracking(),
core.WithLogger(zap.NewNop()),
core.WithDevelopmentMode(true),
core.WithStaticExecutionConfig(&routerConfig),
core.WithEngineExecutionConfig(config.EngineExecutionConfiguration{}),
core.WithBatching(&core.BatchingConfig{}),
)
require.NoError(t, err)

allServers := []*httptest.Server{employeesServer}

return rr, func() {
_ = rr.Shutdown(t.Context())
for _, s := range allServers {
s.Close()
}
}
}
56 changes: 30 additions & 26 deletions router-tests/testenv/testenv.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,15 +602,15 @@ func CreateTestSupervisorEnv(t testing.TB, cfg *Config) (*Environment, error) {
}

replacements := map[string]string{
subgraphs.EmployeesDefaultDemoURL: gqlURL(employeesServer),
subgraphs.FamilyDefaultDemoURL: gqlURL(familyServer),
subgraphs.HobbiesDefaultDemoURL: gqlURL(hobbiesServer),
subgraphs.ProductsDefaultDemoURL: gqlURL(productsServer),
subgraphs.Test1DefaultDemoURL: gqlURL(test1Server),
subgraphs.AvailabilityDefaultDemoURL: gqlURL(availabilityServer),
subgraphs.MoodDefaultDemoURL: gqlURL(moodServer),
subgraphs.CountriesDefaultDemoURL: gqlURL(countriesServer),
subgraphs.ProductsFgDefaultDemoURL: gqlURL(productFgServer),
subgraphs.EmployeesDefaultDemoURL: GqlURL(employeesServer),
subgraphs.FamilyDefaultDemoURL: GqlURL(familyServer),
subgraphs.HobbiesDefaultDemoURL: GqlURL(hobbiesServer),
subgraphs.ProductsDefaultDemoURL: GqlURL(productsServer),
subgraphs.Test1DefaultDemoURL: GqlURL(test1Server),
subgraphs.AvailabilityDefaultDemoURL: GqlURL(availabilityServer),
subgraphs.MoodDefaultDemoURL: GqlURL(moodServer),
subgraphs.CountriesDefaultDemoURL: GqlURL(countriesServer),
subgraphs.ProductsFgDefaultDemoURL: GqlURL(productFgServer),
subgraphs.ProjectsDefaultDemoURL: grpcURL(endpoint),
}

Expand Down Expand Up @@ -1032,15 +1032,15 @@ func CreateTestEnv(t testing.TB, cfg *Config) (*Environment, error) {
}

replacements := map[string]string{
subgraphs.EmployeesDefaultDemoURL: gqlURL(employeesServer),
subgraphs.FamilyDefaultDemoURL: gqlURL(familyServer),
subgraphs.HobbiesDefaultDemoURL: gqlURL(hobbiesServer),
subgraphs.ProductsDefaultDemoURL: gqlURL(productsServer),
subgraphs.Test1DefaultDemoURL: gqlURL(test1Server),
subgraphs.AvailabilityDefaultDemoURL: gqlURL(availabilityServer),
subgraphs.MoodDefaultDemoURL: gqlURL(moodServer),
subgraphs.CountriesDefaultDemoURL: gqlURL(countriesServer),
subgraphs.ProductsFgDefaultDemoURL: gqlURL(productFgServer),
subgraphs.EmployeesDefaultDemoURL: GqlURL(employeesServer),
subgraphs.FamilyDefaultDemoURL: GqlURL(familyServer),
subgraphs.HobbiesDefaultDemoURL: GqlURL(hobbiesServer),
subgraphs.ProductsDefaultDemoURL: GqlURL(productsServer),
subgraphs.Test1DefaultDemoURL: GqlURL(test1Server),
subgraphs.AvailabilityDefaultDemoURL: GqlURL(availabilityServer),
subgraphs.MoodDefaultDemoURL: GqlURL(moodServer),
subgraphs.CountriesDefaultDemoURL: GqlURL(countriesServer),
subgraphs.ProductsFgDefaultDemoURL: GqlURL(productFgServer),
subgraphs.ProjectsDefaultDemoURL: grpcURL(endpoint),
}

Expand Down Expand Up @@ -1316,12 +1316,12 @@ func configureRouter(listenerAddr string, testConfig *Config, routerConfig *node
}

engineExecutionConfig := config.EngineExecutionConfiguration{
EnableNetPoll: true,
EnableSingleFlight: true,
EnableInboundRequestDeduplication: false,
EnableRequestTracing: true,
EnableNormalizationCache: true,
NormalizationCacheSize: 1024,
EnableNetPoll: true,
EnableSingleFlight: true,
EnableInboundRequestDeduplication: false,
EnableRequestTracing: true,
EnableNormalizationCache: true,
NormalizationCacheSize: 1024,
Debug: config.EngineDebugConfiguration{
ReportWebSocketConnections: true,
PrintQueryPlans: false,
Expand Down Expand Up @@ -1735,7 +1735,7 @@ func SetupCDNServer(t testing.TB) (cdnServer *httptest.Server, port int) {
return cdnServer, port
}

func gqlURL(srv *httptest.Server) string {
func GqlURL(srv *httptest.Server) string {
path, err := url.JoinPath(srv.URL, "/graphql")
if err != nil {
panic(err)
Expand Down Expand Up @@ -2143,8 +2143,12 @@ func (e *Environment) newGraphQLRequestOverGET(baseURL string, request GraphQLRe
}

func (e *Environment) MakeGraphQLRequestRaw(request *http.Request) (*TestResponse, error) {
return MakeGraphQLRequestRawFromClient(request, e.RouterClient)
}

func MakeGraphQLRequestRawFromClient(request *http.Request, routerClient *http.Client) (*TestResponse, error) {
request.Header.Set("Accept-Encoding", "identity")
resp, err := e.RouterClient.Do(request)
resp, err := routerClient.Do(request)
if err != nil {
return nil, err
}
Expand Down
58 changes: 58 additions & 0 deletions router/core/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,64 @@ func (r *Router) BaseURL() string {
return r.baseURL
}

// NewServer prepares a new server instance but does not start it. The method should only be used when you want to bootstrap
// the server manually otherwise you can use Router.Start(). You're responsible for setting health checks status to ready with Server.HealthChecks().
// The server can be shutdown with Router.Shutdown(). Use core.WithExecutionConfig to pass the initial config otherwise the Router will
// try to fetch the config from the control plane. You can swap the router config by using Router.newGraphServer().
func (r *Router) NewServer(ctx context.Context) (Server, error) {
if r.shutdown.Load() {
return nil, errors.New("router is shutdown. Create a new instance with router.NewRouter()")
}

if err := r.bootstrap(ctx); err != nil {
return nil, fmt.Errorf("failed to bootstrap application: %w", err)
}

r.httpServer = newServer(&httpServerOptions{
addr: r.listenAddr,
logger: r.logger,
tlsConfig: r.tlsConfig,
tlsServerConfig: r.tlsServerConfig,
healthcheck: r.healthcheck,
baseURL: r.baseURL,
maxHeaderBytes: int(r.routerTrafficConfig.MaxHeaderBytes.Uint64()),
livenessCheckPath: r.livenessCheckPath,
readinessCheckPath: r.readinessCheckPath,
healthCheckPath: r.healthCheckPath,
})

r.configureUsageTracking(ctx)

if r.reloadPersistentState == nil {
r.reloadPersistentState = NewReloadPersistentState(r.logger)
}

r.reloadPersistentState.UpdateReloadPersistentState(&r.Config)

// Start the server with the static config without polling
if r.staticExecutionConfig != nil {
r.logger.Info("Static execution config provided. Polling is disabled. Updating execution config is only possible by providing a config.")
return r.httpServer, r.newServer(ctx, r.staticExecutionConfig)
}

// when no static config is provided and no poller is configured, we can't start the server
if r.configPoller == nil {
return nil, errors.New("config fetcher not provided. Please provide a static execution config instead")
}

cfg, err := r.configPoller.GetRouterConfig(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get initial execution config: %w", err)
}

if err := r.newServer(ctx, cfg.Config); err != nil {
r.logger.Error("Failed to start server with initial config", zap.Error(err))
return nil, err
}

return r.httpServer, nil
}

// bootstrap initializes the Router. It is called by Start() and NewServer().
// It should only be called once for a Router instance.
func (r *Router) bootstrap(ctx context.Context) error {
Expand Down
Loading