diff --git a/router-tests/structured_logging_test.go b/router-tests/structured_logging_test.go index 7a727ac275..564bfb4732 100644 --- a/router-tests/structured_logging_test.go +++ b/router-tests/structured_logging_test.go @@ -10,15 +10,15 @@ import ( "path/filepath" "testing" - "github.com/wundergraph/cosmo/router/core" - "github.com/wundergraph/cosmo/router/pkg/config" - "github.com/wundergraph/cosmo/router/pkg/logging" - "github.com/wundergraph/cosmo/router/pkg/trace/tracetest" + "github.com/stretchr/testify/require" "go.uber.org/zap" "go.uber.org/zap/zapcore" - "github.com/stretchr/testify/require" "github.com/wundergraph/cosmo/router-tests/testenv" + "github.com/wundergraph/cosmo/router/core" + "github.com/wundergraph/cosmo/router/pkg/config" + "github.com/wundergraph/cosmo/router/pkg/logging" + "github.com/wundergraph/cosmo/router/pkg/trace/tracetest" ) // Interface guard @@ -172,7 +172,7 @@ func TestRouterStartLogs(t *testing.T) { require.Equal(t, playgroundLog.Len(), 1) featureFlagLog := xEnv.Observer().FilterMessage("Feature flags enabled") require.Equal(t, featureFlagLog.Len(), 1) - serverListeningLog := xEnv.Observer().FilterMessage("Server listening and serving") + serverListeningLog := xEnv.Observer().FilterMessage("Server initialized and ready to serve requests") require.Equal(t, serverListeningLog.Len(), 1) }) } diff --git a/router-tests/testenv/testenv.go b/router-tests/testenv/testenv.go index ac36b4c557..39fd18fb31 100644 --- a/router-tests/testenv/testenv.go +++ b/router-tests/testenv/testenv.go @@ -6,11 +6,9 @@ import ( "context" "crypto/tls" "crypto/x509" - _ "embed" "encoding/json" "errors" "fmt" - "github.com/hashicorp/consul/sdk/freeport" "io" "log" "math/rand" @@ -30,42 +28,38 @@ import ( "testing" "time" - "github.com/wundergraph/cosmo/router/pkg/logging" - - "go.uber.org/zap/zaptest/observer" - - "github.com/wundergraph/cosmo/router/pkg/controlplane/configpoller" - + "github.com/golang-jwt/jwt/v5" + "github.com/google/uuid" + "github.com/gorilla/websocket" + "github.com/hashicorp/consul/sdk/freeport" + "github.com/hashicorp/go-cleanhttp" + "github.com/hashicorp/go-retryablehttp" + "github.com/nats-io/nats.go" "github.com/nats-io/nats.go/jetstream" - + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" "github.com/twmb/franz-go/pkg/kadm" "github.com/twmb/franz-go/pkg/kgo" - - "github.com/google/uuid" - "github.com/prometheus/client_golang/prometheus" - rmetric "github.com/wundergraph/cosmo/router/pkg/metric" - pubsubNats "github.com/wundergraph/cosmo/router/pkg/pubsub/nats" - "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/datasource/pubsub_datasource" "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/trace" - - "github.com/hashicorp/go-cleanhttp" - - "github.com/golang-jwt/jwt/v5" - "github.com/gorilla/websocket" - "github.com/hashicorp/go-retryablehttp" - "github.com/nats-io/nats.go" - "github.com/stretchr/testify/require" "go.uber.org/atomic" "go.uber.org/zap" "go.uber.org/zap/zapcore" + "go.uber.org/zap/zaptest/observer" "google.golang.org/protobuf/encoding/protojson" "github.com/wundergraph/cosmo/demo/pkg/subgraphs" "github.com/wundergraph/cosmo/router/core" nodev1 "github.com/wundergraph/cosmo/router/gen/proto/wg/cosmo/node/v1" "github.com/wundergraph/cosmo/router/pkg/config" + "github.com/wundergraph/cosmo/router/pkg/controlplane/configpoller" + "github.com/wundergraph/cosmo/router/pkg/logging" + rmetric "github.com/wundergraph/cosmo/router/pkg/metric" + pubsubNats "github.com/wundergraph/cosmo/router/pkg/pubsub/nats" + "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/datasource/pubsub_datasource" + + _ "embed" ) var ErrEnvironmentClosed = errors.New("test environment closed") @@ -717,7 +711,7 @@ func createTestEnv(t testing.TB, cfg *Config) (*Environment, error) { } } - waitErr := e.WaitForServer(ctx, e.RouterURL+"/health/live", 100, 10) + waitErr := e.WaitForServer(ctx, e.RouterURL+"/health/ready", 100, 10) return e, waitErr } diff --git a/router/core/http_server.go b/router/core/http_server.go index 5edd244391..d6c0306f01 100644 --- a/router/core/http_server.go +++ b/router/core/http_server.go @@ -4,11 +4,14 @@ import ( "context" "crypto/tls" "errors" - "github.com/wundergraph/cosmo/router/pkg/health" - "go.uber.org/zap" "net/http" "sync" "time" + + "github.com/go-chi/chi/v5" + "go.uber.org/zap" + + "github.com/wundergraph/cosmo/router/pkg/health" ) type server struct { @@ -23,13 +26,16 @@ type server struct { } type httpServerOptions struct { - addr string - logger *zap.Logger - tlsConfig *TlsConfig - tlsServerConfig *tls.Config - healthcheck health.Checker - baseURL string - maxHeaderBytes int + addr string + logger *zap.Logger + tlsConfig *TlsConfig + tlsServerConfig *tls.Config + healthcheck health.Checker + baseURL string + maxHeaderBytes int + livenessCheckPath string + readinessCheckPath string + healthCheckPath string } func newServer(opts *httpServerOptions) *server { @@ -44,6 +50,12 @@ func newServer(opts *httpServerOptions) *server { MaxHeaderBytes: opts.maxHeaderBytes, } + // Create default handler for liveness and readiness + httpRouter := chi.NewMux() + httpRouter.Get(opts.healthCheckPath, opts.healthcheck.Liveness()) + httpRouter.Get(opts.livenessCheckPath, opts.healthcheck.Liveness()) + httpRouter.Get(opts.readinessCheckPath, opts.healthcheck.Readiness()) + n := &server{ httpServer: httpServer, tlsConfig: opts.tlsConfig, @@ -51,6 +63,7 @@ func newServer(opts *httpServerOptions) *server { mu: sync.RWMutex{}, healthcheck: opts.healthcheck, baseURL: opts.baseURL, + handler: httpRouter, } httpServer.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -84,7 +97,7 @@ func (s *server) HttpServer() *http.Server { // NOT SAFE FOR CONCURRENT USE. func (s *server) SwapGraphServer(ctx context.Context, svr *graphServer) { - needsShutdown := s.handler != nil + needsShutdown := s.handler != nil && s.graphServer != nil // Swap the handler immediately, so we can shut down the old server in the same goroutine // and no other config changes can happen in the meantime. diff --git a/router/core/router.go b/router/core/router.go index c4612aa25f..f3e1a8ad9f 100644 --- a/router/core/router.go +++ b/router/core/router.go @@ -13,46 +13,42 @@ import ( "sync" "time" - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/propagation" - - "github.com/wundergraph/graphql-go-tools/v2/pkg/netpoll" - - "github.com/wundergraph/cosmo/router/internal/persistedoperation/apq" - "github.com/wundergraph/cosmo/router/internal/persistedoperation/operationstorage/cdn" - "github.com/wundergraph/cosmo/router/internal/persistedoperation/operationstorage/s3" - "connectrpc.com/connect" - "github.com/wundergraph/cosmo/router/pkg/watcher" - + "github.com/mitchellh/mapstructure" "github.com/nats-io/nuid" "github.com/redis/go-redis/v9" - "github.com/wundergraph/cosmo/router/internal/docker" - "github.com/wundergraph/cosmo/router/internal/graphiql" - "github.com/wundergraph/cosmo/router/internal/persistedoperation" - "github.com/wundergraph/cosmo/router/pkg/execution_config" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/propagation" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.uber.org/atomic" + "go.uber.org/zap" - "github.com/mitchellh/mapstructure" "github.com/wundergraph/cosmo/router/gen/proto/wg/cosmo/graphqlmetrics/v1/graphqlmetricsv1connect" nodev1 "github.com/wundergraph/cosmo/router/gen/proto/wg/cosmo/node/v1" "github.com/wundergraph/cosmo/router/internal/debug" + "github.com/wundergraph/cosmo/router/internal/docker" + "github.com/wundergraph/cosmo/router/internal/graphiql" "github.com/wundergraph/cosmo/router/internal/graphqlmetrics" + "github.com/wundergraph/cosmo/router/internal/persistedoperation" + "github.com/wundergraph/cosmo/router/internal/persistedoperation/apq" + "github.com/wundergraph/cosmo/router/internal/persistedoperation/operationstorage/cdn" + "github.com/wundergraph/cosmo/router/internal/persistedoperation/operationstorage/s3" "github.com/wundergraph/cosmo/router/internal/retrytransport" "github.com/wundergraph/cosmo/router/internal/stringsx" "github.com/wundergraph/cosmo/router/pkg/config" "github.com/wundergraph/cosmo/router/pkg/controlplane/configpoller" "github.com/wundergraph/cosmo/router/pkg/controlplane/selfregister" "github.com/wundergraph/cosmo/router/pkg/cors" + "github.com/wundergraph/cosmo/router/pkg/execution_config" "github.com/wundergraph/cosmo/router/pkg/health" rmetric "github.com/wundergraph/cosmo/router/pkg/metric" "github.com/wundergraph/cosmo/router/pkg/otel/otelconfig" "github.com/wundergraph/cosmo/router/pkg/statistics" rtrace "github.com/wundergraph/cosmo/router/pkg/trace" - "go.opentelemetry.io/otel/attribute" - sdkmetric "go.opentelemetry.io/otel/sdk/metric" - sdktrace "go.opentelemetry.io/otel/sdk/trace" - "go.uber.org/zap" + "github.com/wundergraph/cosmo/router/pkg/watcher" + "github.com/wundergraph/graphql-go-tools/v2/pkg/netpoll" ) type IPAnonymizationMethod string @@ -606,17 +602,7 @@ func (r *Router) newServer(ctx context.Context, cfg *nodev1.RouterConfig) error return nil } -func (r *Router) listenAndServe(cfg *nodev1.RouterConfig) error { - r.logger.Info("Server listening and serving", - zap.String("listen_addr", r.listenAddr), - zap.Bool("playground", r.playgroundConfig.Enabled), - zap.Bool("introspection", r.introspection), - zap.String("config_version", cfg.GetVersion()), - ) - - // Mark the server as ready - r.httpServer.healthcheck.SetReady(true) - +func (r *Router) listenAndServe() error { go func() { // Mark the server as not ready when the server is stopped defer r.httpServer.healthcheck.SetReady(false) @@ -717,13 +703,16 @@ func (r *Router) NewServer(ctx context.Context) (Server, error) { } r.httpServer = newServer(&httpServerOptions{ - addr: r.listenAddr, - logger: r.logger, - tlsConfig: r.tlsConfig, - tlsServerConfig: r.tlsServerConfig, - healthcheck: r.healthcheck, - baseURL: r.baseURL, - maxHeaderBytes: int(r.routerTrafficConfig.MaxHeaderBytes.Uint64()), + addr: r.listenAddr, + logger: r.logger, + tlsConfig: r.tlsConfig, + tlsServerConfig: r.tlsServerConfig, + healthcheck: r.healthcheck, + baseURL: r.baseURL, + maxHeaderBytes: int(r.routerTrafficConfig.MaxHeaderBytes.Uint64()), + livenessCheckPath: r.livenessCheckPath, + readinessCheckPath: r.readinessCheckPath, + healthCheckPath: r.healthCheckPath, }) // Start the server with the static config without polling @@ -1054,25 +1043,39 @@ func (r *Router) Start(ctx context.Context) error { } r.httpServer = newServer(&httpServerOptions{ - addr: r.listenAddr, - logger: r.logger, - tlsConfig: r.tlsConfig, - tlsServerConfig: r.tlsServerConfig, - healthcheck: r.healthcheck, - baseURL: r.baseURL, - maxHeaderBytes: int(r.routerTrafficConfig.MaxHeaderBytes.Uint64()), + addr: r.listenAddr, + logger: r.logger, + tlsConfig: r.tlsConfig, + tlsServerConfig: r.tlsServerConfig, + healthcheck: r.healthcheck, + baseURL: r.baseURL, + maxHeaderBytes: int(r.routerTrafficConfig.MaxHeaderBytes.Uint64()), + livenessCheckPath: r.livenessCheckPath, + readinessCheckPath: r.readinessCheckPath, + healthCheckPath: r.healthCheckPath, }) // Start the server with the static config without polling if r.staticExecutionConfig != nil { - if err := r.newServer(ctx, r.staticExecutionConfig); err != nil { + if err := r.listenAndServe(); err != nil { return err } - if err := r.listenAndServe(r.staticExecutionConfig); err != nil { + if err := r.newServer(ctx, r.staticExecutionConfig); err != nil { return err } + defer func() { + r.httpServer.healthcheck.SetReady(true) + + r.logger.Info("Server initialized and ready to serve requests", + zap.String("listen_addr", r.listenAddr), + zap.Bool("playground", r.playgroundConfig.Enabled), + zap.Bool("introspection", r.introspection), + zap.String("config_version", r.staticExecutionConfig.Version), + ) + }() + if r.executionConfig != nil && r.executionConfig.Watch { w, err := watcher.NewWatcher(r.logger.With(zap.String("watcher", "execution_config"))) @@ -1136,6 +1139,11 @@ func (r *Router) Start(ctx context.Context) error { return fmt.Errorf("failed to get initial execution config: %w", err) } + if err := r.listenAndServe(); err != nil { + r.logger.Error("Failed to start server with initial config", zap.Error(err)) + return err + } + if err := r.newServer(ctx, cfg.Config); err != nil { return err } @@ -1172,11 +1180,6 @@ func (r *Router) Start(ctx context.Context) error { ) } - if err := r.listenAndServe(cfg.Config); err != nil { - r.logger.Error("Failed to start server with initial config", zap.Error(err)) - return err - } - r.configPoller.Subscribe(ctx, func(newConfig *nodev1.RouterConfig, oldVersion string) error { if r.shutdown.Load() { r.logger.Warn("Router is in shutdown state. Skipping config update") @@ -1190,6 +1193,16 @@ func (r *Router) Start(ctx context.Context) error { return nil }) + // Mark the server as ready + r.httpServer.healthcheck.SetReady(true) + + r.logger.Info("Server initialized and ready to serve requests", + zap.String("listen_addr", r.listenAddr), + zap.Bool("playground", r.playgroundConfig.Enabled), + zap.Bool("introspection", r.introspection), + zap.String("config_version", cfg.Config.GetVersion()), + ) + return nil }