Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions router-tests/structured_logging_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ import (
"path/filepath"
"testing"

"github.com/wundergraph/cosmo/router/core"
"github.com/wundergraph/cosmo/router/pkg/config"
"github.com/wundergraph/cosmo/router/pkg/logging"
"github.com/wundergraph/cosmo/router/pkg/trace/tracetest"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"github.com/stretchr/testify/require"
"github.com/wundergraph/cosmo/router-tests/testenv"
"github.com/wundergraph/cosmo/router/core"
"github.com/wundergraph/cosmo/router/pkg/config"
"github.com/wundergraph/cosmo/router/pkg/logging"
"github.com/wundergraph/cosmo/router/pkg/trace/tracetest"
)

// Interface guard
Expand Down Expand Up @@ -172,7 +172,7 @@ func TestRouterStartLogs(t *testing.T) {
require.Equal(t, playgroundLog.Len(), 1)
featureFlagLog := xEnv.Observer().FilterMessage("Feature flags enabled")
require.Equal(t, featureFlagLog.Len(), 1)
serverListeningLog := xEnv.Observer().FilterMessage("Server listening and serving")
serverListeningLog := xEnv.Observer().FilterMessage("Server initialized and ready to serve requests")
require.Equal(t, serverListeningLog.Len(), 1)
})
}
Expand Down
42 changes: 18 additions & 24 deletions router-tests/testenv/testenv.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,9 @@ import (
"context"
"crypto/tls"
"crypto/x509"
_ "embed"
"encoding/json"
"errors"
"fmt"
"github.com/hashicorp/consul/sdk/freeport"
"io"
"log"
"math/rand"
Expand All @@ -30,42 +28,38 @@ import (
"testing"
"time"

"github.com/wundergraph/cosmo/router/pkg/logging"

"go.uber.org/zap/zaptest/observer"

"github.com/wundergraph/cosmo/router/pkg/controlplane/configpoller"

"github.com/golang-jwt/jwt/v5"
"github.com/google/uuid"
"github.com/gorilla/websocket"
"github.com/hashicorp/consul/sdk/freeport"
"github.com/hashicorp/go-cleanhttp"
"github.com/hashicorp/go-retryablehttp"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"

"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"

"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
rmetric "github.com/wundergraph/cosmo/router/pkg/metric"
pubsubNats "github.com/wundergraph/cosmo/router/pkg/pubsub/nats"
"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/datasource/pubsub_datasource"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/trace"

"github.com/hashicorp/go-cleanhttp"

"github.com/golang-jwt/jwt/v5"
"github.com/gorilla/websocket"
"github.com/hashicorp/go-retryablehttp"
"github.com/nats-io/nats.go"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest/observer"
"google.golang.org/protobuf/encoding/protojson"

"github.com/wundergraph/cosmo/demo/pkg/subgraphs"
"github.com/wundergraph/cosmo/router/core"
nodev1 "github.com/wundergraph/cosmo/router/gen/proto/wg/cosmo/node/v1"
"github.com/wundergraph/cosmo/router/pkg/config"
"github.com/wundergraph/cosmo/router/pkg/controlplane/configpoller"
"github.com/wundergraph/cosmo/router/pkg/logging"
rmetric "github.com/wundergraph/cosmo/router/pkg/metric"
pubsubNats "github.com/wundergraph/cosmo/router/pkg/pubsub/nats"
"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/datasource/pubsub_datasource"

_ "embed"
Comment thread
Noroth marked this conversation as resolved.
)

var ErrEnvironmentClosed = errors.New("test environment closed")
Expand Down Expand Up @@ -717,7 +711,7 @@ func createTestEnv(t testing.TB, cfg *Config) (*Environment, error) {
}
}

waitErr := e.WaitForServer(ctx, e.RouterURL+"/health/live", 100, 10)
waitErr := e.WaitForServer(ctx, e.RouterURL+"/health/ready", 100, 10)
Comment thread
jensneuse marked this conversation as resolved.

return e, waitErr
}
Expand Down
33 changes: 23 additions & 10 deletions router/core/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ import (
"context"
"crypto/tls"
"errors"
"github.com/wundergraph/cosmo/router/pkg/health"
"go.uber.org/zap"
"net/http"
"sync"
"time"

"github.com/go-chi/chi/v5"
"go.uber.org/zap"

"github.com/wundergraph/cosmo/router/pkg/health"
)

type server struct {
Expand All @@ -23,13 +26,16 @@ type server struct {
}

type httpServerOptions struct {
addr string
logger *zap.Logger
tlsConfig *TlsConfig
tlsServerConfig *tls.Config
healthcheck health.Checker
baseURL string
maxHeaderBytes int
addr string
logger *zap.Logger
tlsConfig *TlsConfig
tlsServerConfig *tls.Config
healthcheck health.Checker
baseURL string
maxHeaderBytes int
livenessCheckPath string
readinessCheckPath string
healthCheckPath string
}

func newServer(opts *httpServerOptions) *server {
Expand All @@ -44,13 +50,20 @@ func newServer(opts *httpServerOptions) *server {
MaxHeaderBytes: opts.maxHeaderBytes,
}

// Create default handler for liveness and readiness
httpRouter := chi.NewMux()
Comment thread
Noroth marked this conversation as resolved.
httpRouter.Get(opts.healthCheckPath, opts.healthcheck.Liveness())
httpRouter.Get(opts.livenessCheckPath, opts.healthcheck.Liveness())
httpRouter.Get(opts.readinessCheckPath, opts.healthcheck.Readiness())

n := &server{
httpServer: httpServer,
tlsConfig: opts.tlsConfig,
logger: opts.logger,
mu: sync.RWMutex{},
healthcheck: opts.healthcheck,
baseURL: opts.baseURL,
handler: httpRouter,
}

httpServer.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -84,7 +97,7 @@ func (s *server) HttpServer() *http.Server {
// NOT SAFE FOR CONCURRENT USE.
func (s *server) SwapGraphServer(ctx context.Context, svr *graphServer) {

needsShutdown := s.handler != nil
needsShutdown := s.handler != nil && s.graphServer != nil

// Swap the handler immediately, so we can shut down the old server in the same goroutine
// and no other config changes can happen in the meantime.
Expand Down
117 changes: 65 additions & 52 deletions router/core/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,46 +13,42 @@ import (
"sync"
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"

"github.com/wundergraph/graphql-go-tools/v2/pkg/netpoll"

"github.com/wundergraph/cosmo/router/internal/persistedoperation/apq"
"github.com/wundergraph/cosmo/router/internal/persistedoperation/operationstorage/cdn"
"github.com/wundergraph/cosmo/router/internal/persistedoperation/operationstorage/s3"

"connectrpc.com/connect"
"github.com/wundergraph/cosmo/router/pkg/watcher"

"github.com/mitchellh/mapstructure"
"github.com/nats-io/nuid"
"github.com/redis/go-redis/v9"
"github.com/wundergraph/cosmo/router/internal/docker"
"github.com/wundergraph/cosmo/router/internal/graphiql"
"github.com/wundergraph/cosmo/router/internal/persistedoperation"
"github.com/wundergraph/cosmo/router/pkg/execution_config"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/propagation"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.uber.org/atomic"
"go.uber.org/zap"

"github.com/mitchellh/mapstructure"
"github.com/wundergraph/cosmo/router/gen/proto/wg/cosmo/graphqlmetrics/v1/graphqlmetricsv1connect"
nodev1 "github.com/wundergraph/cosmo/router/gen/proto/wg/cosmo/node/v1"
"github.com/wundergraph/cosmo/router/internal/debug"
"github.com/wundergraph/cosmo/router/internal/docker"
"github.com/wundergraph/cosmo/router/internal/graphiql"
"github.com/wundergraph/cosmo/router/internal/graphqlmetrics"
"github.com/wundergraph/cosmo/router/internal/persistedoperation"
"github.com/wundergraph/cosmo/router/internal/persistedoperation/apq"
"github.com/wundergraph/cosmo/router/internal/persistedoperation/operationstorage/cdn"
"github.com/wundergraph/cosmo/router/internal/persistedoperation/operationstorage/s3"
"github.com/wundergraph/cosmo/router/internal/retrytransport"
"github.com/wundergraph/cosmo/router/internal/stringsx"
"github.com/wundergraph/cosmo/router/pkg/config"
"github.com/wundergraph/cosmo/router/pkg/controlplane/configpoller"
"github.com/wundergraph/cosmo/router/pkg/controlplane/selfregister"
"github.com/wundergraph/cosmo/router/pkg/cors"
"github.com/wundergraph/cosmo/router/pkg/execution_config"
"github.com/wundergraph/cosmo/router/pkg/health"
rmetric "github.com/wundergraph/cosmo/router/pkg/metric"
"github.com/wundergraph/cosmo/router/pkg/otel/otelconfig"
"github.com/wundergraph/cosmo/router/pkg/statistics"
rtrace "github.com/wundergraph/cosmo/router/pkg/trace"
"go.opentelemetry.io/otel/attribute"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.uber.org/zap"
"github.com/wundergraph/cosmo/router/pkg/watcher"
"github.com/wundergraph/graphql-go-tools/v2/pkg/netpoll"
)

type IPAnonymizationMethod string
Expand Down Expand Up @@ -606,17 +602,7 @@ func (r *Router) newServer(ctx context.Context, cfg *nodev1.RouterConfig) error
return nil
}

func (r *Router) listenAndServe(cfg *nodev1.RouterConfig) error {
r.logger.Info("Server listening and serving",
zap.String("listen_addr", r.listenAddr),
zap.Bool("playground", r.playgroundConfig.Enabled),
zap.Bool("introspection", r.introspection),
zap.String("config_version", cfg.GetVersion()),
)

// Mark the server as ready
r.httpServer.healthcheck.SetReady(true)

func (r *Router) listenAndServe() error {
go func() {
// Mark the server as not ready when the server is stopped
defer r.httpServer.healthcheck.SetReady(false)
Expand Down Expand Up @@ -717,13 +703,16 @@ func (r *Router) NewServer(ctx context.Context) (Server, error) {
}

r.httpServer = newServer(&httpServerOptions{
addr: r.listenAddr,
logger: r.logger,
tlsConfig: r.tlsConfig,
tlsServerConfig: r.tlsServerConfig,
healthcheck: r.healthcheck,
baseURL: r.baseURL,
maxHeaderBytes: int(r.routerTrafficConfig.MaxHeaderBytes.Uint64()),
addr: r.listenAddr,
logger: r.logger,
tlsConfig: r.tlsConfig,
tlsServerConfig: r.tlsServerConfig,
healthcheck: r.healthcheck,
baseURL: r.baseURL,
maxHeaderBytes: int(r.routerTrafficConfig.MaxHeaderBytes.Uint64()),
livenessCheckPath: r.livenessCheckPath,
readinessCheckPath: r.readinessCheckPath,
healthCheckPath: r.healthCheckPath,
})

// Start the server with the static config without polling
Expand Down Expand Up @@ -1054,25 +1043,39 @@ func (r *Router) Start(ctx context.Context) error {
}

r.httpServer = newServer(&httpServerOptions{
addr: r.listenAddr,
logger: r.logger,
tlsConfig: r.tlsConfig,
tlsServerConfig: r.tlsServerConfig,
healthcheck: r.healthcheck,
baseURL: r.baseURL,
maxHeaderBytes: int(r.routerTrafficConfig.MaxHeaderBytes.Uint64()),
addr: r.listenAddr,
logger: r.logger,
tlsConfig: r.tlsConfig,
tlsServerConfig: r.tlsServerConfig,
healthcheck: r.healthcheck,
baseURL: r.baseURL,
maxHeaderBytes: int(r.routerTrafficConfig.MaxHeaderBytes.Uint64()),
livenessCheckPath: r.livenessCheckPath,
readinessCheckPath: r.readinessCheckPath,
healthCheckPath: r.healthCheckPath,
})

// Start the server with the static config without polling
if r.staticExecutionConfig != nil {
if err := r.newServer(ctx, r.staticExecutionConfig); err != nil {
if err := r.listenAndServe(); err != nil {
return err
}

if err := r.listenAndServe(r.staticExecutionConfig); err != nil {
if err := r.newServer(ctx, r.staticExecutionConfig); err != nil {
return err
}

defer func() {
r.httpServer.healthcheck.SetReady(true)

r.logger.Info("Server initialized and ready to serve requests",
zap.String("listen_addr", r.listenAddr),
zap.Bool("playground", r.playgroundConfig.Enabled),
zap.Bool("introspection", r.introspection),
zap.String("config_version", r.staticExecutionConfig.Version),
)
}()

if r.executionConfig != nil && r.executionConfig.Watch {

w, err := watcher.NewWatcher(r.logger.With(zap.String("watcher", "execution_config")))
Expand Down Expand Up @@ -1136,6 +1139,11 @@ func (r *Router) Start(ctx context.Context) error {
return fmt.Errorf("failed to get initial execution config: %w", err)
}

if err := r.listenAndServe(); err != nil {
r.logger.Error("Failed to start server with initial config", zap.Error(err))
return err
}

if err := r.newServer(ctx, cfg.Config); err != nil {
return err
}
Expand Down Expand Up @@ -1172,11 +1180,6 @@ func (r *Router) Start(ctx context.Context) error {
)
}

if err := r.listenAndServe(cfg.Config); err != nil {
r.logger.Error("Failed to start server with initial config", zap.Error(err))
return err
}

r.configPoller.Subscribe(ctx, func(newConfig *nodev1.RouterConfig, oldVersion string) error {
if r.shutdown.Load() {
r.logger.Warn("Router is in shutdown state. Skipping config update")
Expand All @@ -1190,6 +1193,16 @@ func (r *Router) Start(ctx context.Context) error {
return nil
})

// Mark the server as ready
r.httpServer.healthcheck.SetReady(true)

r.logger.Info("Server initialized and ready to serve requests",
zap.String("listen_addr", r.listenAddr),
zap.Bool("playground", r.playgroundConfig.Enabled),
zap.Bool("introspection", r.introspection),
zap.String("config_version", cfg.Config.GetVersion()),
)

return nil
}

Expand Down