diff --git a/router-tests/lifecycle/new_server_test.go b/router-tests/lifecycle/new_server_test.go new file mode 100644 index 0000000000..c1ae396097 --- /dev/null +++ b/router-tests/lifecycle/new_server_test.go @@ -0,0 +1,153 @@ +package integration + +import ( + "bytes" + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/wundergraph/cosmo/demo/pkg/subgraphs" + "github.com/wundergraph/cosmo/router-tests/testenv" + "github.com/wundergraph/cosmo/router/core" + nodev1 "github.com/wundergraph/cosmo/router/gen/proto/wg/cosmo/node/v1" + "github.com/wundergraph/cosmo/router/pkg/config" + natsPubsub "github.com/wundergraph/cosmo/router/pkg/pubsub/nats" + "go.uber.org/zap" + "google.golang.org/protobuf/encoding/protojson" +) + +func TestNewServer(t *testing.T) { + t.Parallel() + + t.Run("creates a working server without starting a listener", func(t *testing.T) { + t.Parallel() + + rr, cleanup := setupNewServerTest(t) + defer cleanup() + + svr, err := rr.NewServer(t.Context()) + require.NoError(t, err) + + svr.HealthChecks().SetReady(true) + ts := httptest.NewServer(svr.HttpServer().Handler) + defer ts.Close() + + data, err := json.Marshal(testenv.GraphQLRequest{Query: `query { employees { id } }`}) + require.NoError(t, err) + + req, err := http.NewRequestWithContext(t.Context(), http.MethodPost, ts.URL+"/graphql", bytes.NewReader(data)) + require.NoError(t, err) + + response, err := testenv.MakeGraphQLRequestRawFromClient(req, &http.Client{}) + require.NoError(t, err) + + require.Equal(t, http.StatusOK, response.Response.StatusCode) + require.JSONEq(t, `{"data":{"employees":[{"id":1},{"id":2},{"id":3},{"id":4},{"id":5},{"id":7},{"id":8},{"id":10},{"id":11},{"id":12}]}}`, response.Body) + }) + + t.Run("health check endpoint is reachable", func(t *testing.T) { + t.Parallel() + + rr, cleanup := setupNewServerTest(t) + defer cleanup() + + svr, err := rr.NewServer(t.Context()) + require.NoError(t, err) + + svr.HealthChecks().SetReady(true) + ts := httptest.NewServer(svr.HttpServer().Handler) + defer ts.Close() + + resp, err := http.Get(ts.URL + "/health/ready") + require.NoError(t, err) + defer func() { + _ = resp.Body.Close() + }() + require.Equal(t, http.StatusOK, resp.StatusCode) + }) + + t.Run("new server shutdown prevents further requests", func(t *testing.T) { + t.Parallel() + + rr, cleanup := setupNewServerTest(t) + defer cleanup() + + svr, err := rr.NewServer(t.Context()) + require.NoError(t, err) + + svr.HealthChecks().SetReady(true) + ts := httptest.NewServer(svr.HttpServer().Handler) + + // Verify the server works before shutdown + data, err := json.Marshal(testenv.GraphQLRequest{Query: `query { employees { id } }`}) + require.NoError(t, err) + + req, err := http.NewRequestWithContext(t.Context(), http.MethodPost, ts.URL+"/graphql", bytes.NewReader(data)) + require.NoError(t, err) + + response1, err := testenv.MakeGraphQLRequestRawFromClient(req, &http.Client{}) + require.NoError(t, err) + + require.Equal(t, http.StatusOK, response1.Response.StatusCode) + require.JSONEq(t, `{"data":{"employees":[{"id":1},{"id":2},{"id":3},{"id":4},{"id":5},{"id":7},{"id":8},{"id":10},{"id":11},{"id":12}]}}`, response1.Body) + + // Shutdown the router, then close the httptest server + shutdownCtx, shutdownCancel := context.WithTimeout(t.Context(), 5*time.Second) + defer shutdownCancel() + require.NoError(t, rr.Shutdown(shutdownCtx)) + ts.Close() + + // After shutdown, requests should fail + ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + defer cancel() + req, err = http.NewRequestWithContext(ctx, http.MethodPost, ts.URL+"/graphql", bytes.NewReader(data)) + require.NoError(t, err) + + _, err = testenv.MakeGraphQLRequestRawFromClient(req, &http.Client{}) + require.Error(t, err) + }) +} + +func setupNewServerTest(t *testing.T) (rr *core.Router, cleanup func()) { + t.Helper() + + employeesServer := httptest.NewServer(subgraphs.EmployeesHandler(&subgraphs.SubgraphOptions{ + NatsPubSubByProviderID: map[string]natsPubsub.Adapter{}, + })) + + // Build the router config from the embedded template + replaced := testenv.ConfigJSONTemplate + replacements := map[string]string{ + subgraphs.EmployeesDefaultDemoURL: testenv.GqlURL(employeesServer), + } + for k, v := range replacements { + replaced = strings.ReplaceAll(replaced, k, v) + } + + var routerConfig nodev1.RouterConfig + require.NoError(t, protojson.Unmarshal([]byte(replaced), &routerConfig)) + + rr, err := core.NewRouter( + core.WithDisableUsageTracking(), + core.WithLogger(zap.NewNop()), + core.WithDevelopmentMode(true), + core.WithStaticExecutionConfig(&routerConfig), + core.WithEngineExecutionConfig(config.EngineExecutionConfiguration{}), + core.WithBatching(&core.BatchingConfig{}), + ) + require.NoError(t, err) + + allServers := []*httptest.Server{employeesServer} + + return rr, func() { + _ = rr.Shutdown(t.Context()) + for _, s := range allServers { + s.Close() + } + } +} diff --git a/router-tests/testenv/testenv.go b/router-tests/testenv/testenv.go index 0b9b9b2806..4b35a3332c 100644 --- a/router-tests/testenv/testenv.go +++ b/router-tests/testenv/testenv.go @@ -602,15 +602,15 @@ func CreateTestSupervisorEnv(t testing.TB, cfg *Config) (*Environment, error) { } replacements := map[string]string{ - subgraphs.EmployeesDefaultDemoURL: gqlURL(employeesServer), - subgraphs.FamilyDefaultDemoURL: gqlURL(familyServer), - subgraphs.HobbiesDefaultDemoURL: gqlURL(hobbiesServer), - subgraphs.ProductsDefaultDemoURL: gqlURL(productsServer), - subgraphs.Test1DefaultDemoURL: gqlURL(test1Server), - subgraphs.AvailabilityDefaultDemoURL: gqlURL(availabilityServer), - subgraphs.MoodDefaultDemoURL: gqlURL(moodServer), - subgraphs.CountriesDefaultDemoURL: gqlURL(countriesServer), - subgraphs.ProductsFgDefaultDemoURL: gqlURL(productFgServer), + subgraphs.EmployeesDefaultDemoURL: GqlURL(employeesServer), + subgraphs.FamilyDefaultDemoURL: GqlURL(familyServer), + subgraphs.HobbiesDefaultDemoURL: GqlURL(hobbiesServer), + subgraphs.ProductsDefaultDemoURL: GqlURL(productsServer), + subgraphs.Test1DefaultDemoURL: GqlURL(test1Server), + subgraphs.AvailabilityDefaultDemoURL: GqlURL(availabilityServer), + subgraphs.MoodDefaultDemoURL: GqlURL(moodServer), + subgraphs.CountriesDefaultDemoURL: GqlURL(countriesServer), + subgraphs.ProductsFgDefaultDemoURL: GqlURL(productFgServer), subgraphs.ProjectsDefaultDemoURL: grpcURL(endpoint), } @@ -1032,15 +1032,15 @@ func CreateTestEnv(t testing.TB, cfg *Config) (*Environment, error) { } replacements := map[string]string{ - subgraphs.EmployeesDefaultDemoURL: gqlURL(employeesServer), - subgraphs.FamilyDefaultDemoURL: gqlURL(familyServer), - subgraphs.HobbiesDefaultDemoURL: gqlURL(hobbiesServer), - subgraphs.ProductsDefaultDemoURL: gqlURL(productsServer), - subgraphs.Test1DefaultDemoURL: gqlURL(test1Server), - subgraphs.AvailabilityDefaultDemoURL: gqlURL(availabilityServer), - subgraphs.MoodDefaultDemoURL: gqlURL(moodServer), - subgraphs.CountriesDefaultDemoURL: gqlURL(countriesServer), - subgraphs.ProductsFgDefaultDemoURL: gqlURL(productFgServer), + subgraphs.EmployeesDefaultDemoURL: GqlURL(employeesServer), + subgraphs.FamilyDefaultDemoURL: GqlURL(familyServer), + subgraphs.HobbiesDefaultDemoURL: GqlURL(hobbiesServer), + subgraphs.ProductsDefaultDemoURL: GqlURL(productsServer), + subgraphs.Test1DefaultDemoURL: GqlURL(test1Server), + subgraphs.AvailabilityDefaultDemoURL: GqlURL(availabilityServer), + subgraphs.MoodDefaultDemoURL: GqlURL(moodServer), + subgraphs.CountriesDefaultDemoURL: GqlURL(countriesServer), + subgraphs.ProductsFgDefaultDemoURL: GqlURL(productFgServer), subgraphs.ProjectsDefaultDemoURL: grpcURL(endpoint), } @@ -1316,12 +1316,12 @@ func configureRouter(listenerAddr string, testConfig *Config, routerConfig *node } engineExecutionConfig := config.EngineExecutionConfiguration{ - EnableNetPoll: true, - EnableSingleFlight: true, - EnableInboundRequestDeduplication: false, - EnableRequestTracing: true, - EnableNormalizationCache: true, - NormalizationCacheSize: 1024, + EnableNetPoll: true, + EnableSingleFlight: true, + EnableInboundRequestDeduplication: false, + EnableRequestTracing: true, + EnableNormalizationCache: true, + NormalizationCacheSize: 1024, Debug: config.EngineDebugConfiguration{ ReportWebSocketConnections: true, PrintQueryPlans: false, @@ -1735,7 +1735,7 @@ func SetupCDNServer(t testing.TB) (cdnServer *httptest.Server, port int) { return cdnServer, port } -func gqlURL(srv *httptest.Server) string { +func GqlURL(srv *httptest.Server) string { path, err := url.JoinPath(srv.URL, "/graphql") if err != nil { panic(err) @@ -2143,8 +2143,12 @@ func (e *Environment) newGraphQLRequestOverGET(baseURL string, request GraphQLRe } func (e *Environment) MakeGraphQLRequestRaw(request *http.Request) (*TestResponse, error) { + return MakeGraphQLRequestRawFromClient(request, e.RouterClient) +} + +func MakeGraphQLRequestRawFromClient(request *http.Request, routerClient *http.Client) (*TestResponse, error) { request.Header.Set("Accept-Encoding", "identity") - resp, err := e.RouterClient.Do(request) + resp, err := routerClient.Do(request) if err != nil { return nil, err } diff --git a/router/core/router.go b/router/core/router.go index 9d17412ee5..45c95ad663 100644 --- a/router/core/router.go +++ b/router/core/router.go @@ -728,6 +728,64 @@ func (r *Router) BaseURL() string { return r.baseURL } +// NewServer prepares a new server instance but does not start it. The method should only be used when you want to bootstrap +// the server manually otherwise you can use Router.Start(). You're responsible for setting health checks status to ready with Server.HealthChecks(). +// The server can be shutdown with Router.Shutdown(). Use core.WithExecutionConfig to pass the initial config otherwise the Router will +// try to fetch the config from the control plane. You can swap the router config by using Router.newGraphServer(). +func (r *Router) NewServer(ctx context.Context) (Server, error) { + if r.shutdown.Load() { + return nil, errors.New("router is shutdown. Create a new instance with router.NewRouter()") + } + + if err := r.bootstrap(ctx); err != nil { + return nil, fmt.Errorf("failed to bootstrap application: %w", err) + } + + r.httpServer = newServer(&httpServerOptions{ + addr: r.listenAddr, + logger: r.logger, + tlsConfig: r.tlsConfig, + tlsServerConfig: r.tlsServerConfig, + healthcheck: r.healthcheck, + baseURL: r.baseURL, + maxHeaderBytes: int(r.routerTrafficConfig.MaxHeaderBytes.Uint64()), + livenessCheckPath: r.livenessCheckPath, + readinessCheckPath: r.readinessCheckPath, + healthCheckPath: r.healthCheckPath, + }) + + r.configureUsageTracking(ctx) + + if r.reloadPersistentState == nil { + r.reloadPersistentState = NewReloadPersistentState(r.logger) + } + + r.reloadPersistentState.UpdateReloadPersistentState(&r.Config) + + // Start the server with the static config without polling + if r.staticExecutionConfig != nil { + r.logger.Info("Static execution config provided. Polling is disabled. Updating execution config is only possible by providing a config.") + return r.httpServer, r.newServer(ctx, r.staticExecutionConfig) + } + + // when no static config is provided and no poller is configured, we can't start the server + if r.configPoller == nil { + return nil, errors.New("config fetcher not provided. Please provide a static execution config instead") + } + + cfg, err := r.configPoller.GetRouterConfig(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get initial execution config: %w", err) + } + + if err := r.newServer(ctx, cfg.Config); err != nil { + r.logger.Error("Failed to start server with initial config", zap.Error(err)) + return nil, err + } + + return r.httpServer, nil +} + // bootstrap initializes the Router. It is called by Start() and NewServer(). // It should only be called once for a Router instance. func (r *Router) bootstrap(ctx context.Context) error {