Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions router/core/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"github.com/wundergraph/cosmo/router/pkg/grpcconnector"
pubsub_datasource "github.com/wundergraph/cosmo/router/pkg/pubsub/datasource"

grpcdatasource "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/datasource/grpc_datasource"

"github.com/wundergraph/graphql-go-tools/v2/pkg/ast"
"github.com/wundergraph/graphql-go-tools/v2/pkg/astparser"
"github.com/wundergraph/graphql-go-tools/v2/pkg/asttransform"
Expand All @@ -37,6 +39,7 @@ type ExecutorConfigurationBuilder struct {
instanceData InstanceData

subscriptionHooks subscriptionHooks
connectTransports map[string]grpcdatasource.RPCTransport
}

type Executor struct {
Expand Down Expand Up @@ -218,6 +221,7 @@ func (b *ExecutorConfigurationBuilder) buildPlannerConfiguration(ctx context.Con
b.logger,
routerEngineCfg.Execution.EnableNetPoll,
b.instanceData,
b.connectTransports,
), b.logger, b.subscriptionHooks)

// this generates the plan config using the data source factories from the config package
Expand Down
11 changes: 11 additions & 0 deletions router/core/factoryresolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type DefaultFactoryResolver struct {
transportFactory ApiTransportFactory
defaultSubgraphRequestTimeout time.Duration
subscriptionClientOptions []graphql_datasource.Options
connectTransports map[string]grpcdatasource.RPCTransport
}

func NewDefaultFactoryResolver(
Expand All @@ -89,6 +90,7 @@ func NewDefaultFactoryResolver(
log *zap.Logger,
enableNetPoll bool,
instanceData InstanceData,
connectTransports map[string]grpcdatasource.RPCTransport,
) *DefaultFactoryResolver {
transportFactory := NewTransport(transportOptions)

Expand Down Expand Up @@ -164,10 +166,19 @@ func NewDefaultFactoryResolver(
transportFactory: transportFactory,
defaultSubgraphRequestTimeout: transportOptions.SubgraphTransportOptions.RequestTimeout,
subscriptionClientOptions: options,
connectTransports: connectTransports,
}
}

func (d *DefaultFactoryResolver) ResolveGraphqlFactory(subgraphName string) (plan.PlannerFactory[graphql_datasource.Configuration], error) {
// Check Connect transports first — they use HTTP via the Connect protocol
// instead of native gRPC, so they bypass the gRPC connector entirely.
if d.connectTransports != nil {
if ct, ok := d.connectTransports[subgraphName]; ok {
return graphql_datasource.NewFactoryConnect(d.engineCtx, ct)
}
}

if d.connector != nil {
// If the connector is not nil, we try to get the provider for the subgraph.
// In case of a provider, we use the gRPC client provider to create the factory.
Expand Down
80 changes: 77 additions & 3 deletions router/core/graph_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/wundergraph/cosmo/router/pkg/cors"
"github.com/wundergraph/cosmo/router/pkg/execution_config"
"github.com/wundergraph/cosmo/router/pkg/grpcconnector"
"github.com/wundergraph/cosmo/router/pkg/grpcprotocol"
"github.com/wundergraph/cosmo/router/pkg/grpcconnector/grpccommon"
"github.com/wundergraph/cosmo/router/pkg/grpcconnector/grpcplugin"
"github.com/wundergraph/cosmo/router/pkg/grpcconnector/grpcpluginoci"
Expand All @@ -57,6 +58,7 @@ import (
"github.com/wundergraph/cosmo/router/pkg/statistics"
rtrace "github.com/wundergraph/cosmo/router/pkg/trace"

grpcdatasource "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/datasource/grpc_datasource"
"github.com/wundergraph/graphql-go-tools/v2/pkg/astparser"
)

Expand Down Expand Up @@ -104,6 +106,7 @@ type (
connector *grpcconnector.Connector
circuitBreakerManager *circuit.Manager
headerPropagation *HeaderPropagation
grpcProtocolConfig *config.GRPCProtocolConfig
}
)

Expand Down Expand Up @@ -175,6 +178,10 @@ func newGraphServer(ctx context.Context, r *Router, routerConfig *nodev1.RouterC
}
}

if err := grpcprotocol.Validate(r.grpcProtocol); err != nil {
return nil, fmt.Errorf("invalid grpc_protocol configuration: %w", err)
}

ctx, cancel := context.WithCancel(ctx)
s := &graphServer{
context: ctx,
Expand All @@ -192,8 +199,9 @@ func newGraphServer(ctx context.Context, r *Router, routerConfig *nodev1.RouterC
HostName: r.hostName,
ListenAddress: r.listenAddr,
},
storageProviders: &r.storageProviders,
headerPropagation: r.headerPropagation,
storageProviders: &r.storageProviders,
headerPropagation: r.headerPropagation,
grpcProtocolConfig: r.grpcProtocol,
}

baseOtelAttributes := []attribute.KeyValue{
Expand Down Expand Up @@ -1242,7 +1250,42 @@ func (s *graphServer) buildGraphMux(
subgraphTippers[subgraph] = subgraphTransport
}

if err := s.setupConnector(ctx, opts.EngineConfig, opts.ConfigSubgraphs, telemetryAttExpressions, tracingAttExpressions); err != nil {
// Build HTTP clients for Connect subgraphs, matching the timeout and transport
// configuration applied to regular GraphQL subgraph clients.
connectDefaultHTTPClient := &http.Client{
Transport: s.baseTransport,
Timeout: s.subgraphTransportOptions.RequestTimeout,
}
connectSubgraphHTTPClients := map[string]*http.Client{}
for subgraph, subgraphOpts := range s.subgraphTransportOptions.SubgraphMap {
transport, ok := s.subgraphTransports[subgraph]
if !ok {
transport = s.baseTransport
}
connectSubgraphHTTPClients[subgraph] = &http.Client{
Transport: transport,
Timeout: subgraphOpts.RequestTimeout,
}
}
// Include subgraphs with per-subgraph TLS but no traffic shaping overrides.
for subgraph, transport := range s.subgraphTransports {
if _, exists := connectSubgraphHTTPClients[subgraph]; !exists {
connectSubgraphHTTPClients[subgraph] = &http.Client{
Transport: transport,
Timeout: s.subgraphTransportOptions.RequestTimeout,
}
}
}

// Build Connect transports for subgraphs configured to use ConnectRPC protocol.
connectTransports := grpcprotocol.BuildConnectTransports(
s.grpcProtocolConfig,
collectGRPCSubgraphURLs(opts.EngineConfig, opts.ConfigSubgraphs),
connectSubgraphHTTPClients,
connectDefaultHTTPClient,
)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Comment thread
coderabbitai[bot] marked this conversation as resolved.

if err := s.setupConnector(ctx, opts.EngineConfig, opts.ConfigSubgraphs, telemetryAttExpressions, tracingAttExpressions, connectTransports); err != nil {
return nil, fmt.Errorf("failed to setup plugin host: %w", err)
}

Expand Down Expand Up @@ -1288,6 +1331,7 @@ func (s *graphServer) buildGraphMux(
CircuitBreaker: s.circuitBreakerManager,
},
subscriptionHooks: s.subscriptionHooks,
connectTransports: connectTransports,
}

executor, providers, err := ecb.Build(
Expand Down Expand Up @@ -1623,6 +1667,7 @@ func (s *graphServer) setupConnector(
configSubgraphs []*nodev1.Subgraph,
telemetryAttributeExpressions *attributeExpressions,
tracingAttributeExpressions *attributeExpressions,
connectTransports map[string]grpcdatasource.RPCTransport,
) error {
s.connector = grpcconnector.NewConnector()

Expand All @@ -1645,6 +1690,14 @@ func (s *graphServer) setupConnector(
return fmt.Errorf("subgraph %s not found", dsConfig.Id)
}

// Skip gRPC connector registration for Connect subgraphs —
// they use HTTP via the Connect protocol instead of native gRPC.
if connectTransports != nil {
if _, isConnect := connectTransports[sg.Name]; isConnect {
continue
}
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

pluginConfig := grpcConfig.GetPlugin()
if pluginConfig == nil {
remoteProvider, err := grpcremote.NewRemoteGRPCProvider(grpcremote.RemoteGRPCProviderConfig{
Expand Down Expand Up @@ -2049,3 +2102,24 @@ func configureSubgraphOverwrites(

return subgraphs, nil
}

// collectGRPCSubgraphURLs returns a map of subgraphName → routingUrl
// for all subgraphs that have gRPC configuration in the engine config.
func collectGRPCSubgraphURLs(
engineConfig *nodev1.EngineConfiguration,
configSubgraphs []*nodev1.Subgraph,
) map[string]string {
urls := map[string]string{}
for _, dsConfig := range engineConfig.DatasourceConfigurations {
if dsConfig.GetCustomGraphql().GetGrpc() == nil {
continue
}
for _, sg := range configSubgraphs {
if sg.Id == dsConfig.Id {
urls[sg.Name] = sg.RoutingUrl
break
}
}
}
return urls
}
6 changes: 6 additions & 0 deletions router/core/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -2264,6 +2264,12 @@ func WithConnectRPC(cfg config.ConnectRPCConfiguration) Option {
}
}

func WithGRPCProtocol(cfg *config.GRPCProtocolConfig) Option {
return func(r *Router) {
r.grpcProtocol = cfg
}
}

func WithDemoMode(demoMode bool) Option {
return func(r *Router) {
r.demoMode = demoMode
Expand Down
1 change: 1 addition & 0 deletions router/core/router_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ type Config struct {
mcp config.MCPConfiguration
connectRPC config.ConnectRPCConfiguration
plugins config.PluginsConfiguration
grpcProtocol *config.GRPCProtocolConfig
tracingAttributes []config.CustomAttribute
subscriptionHooks subscriptionHooks
}
Expand Down
1 change: 1 addition & 0 deletions router/core/supervisor_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ func optionsFromResources(logger *zap.Logger, config *config.Config, reloadPersi
WithMCP(config.MCP),
WithConnectRPC(config.ConnectRPC),
WithPlugins(config.Plugins),
WithGRPCProtocol(config.GRPCProtocol),
WithDemoMode(config.DemoMode),
WithStreamsHandlerConfiguration(config.Events.Handlers),
WithReloadPersistentState(reloadPersistentState),
Expand Down
3 changes: 2 additions & 1 deletion router/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -197,4 +197,5 @@ replace (
// Remember you can use Go workspaces to avoid using replace directives in multiple go.mod files
// Use what is best for your personal workflow. See CONTRIBUTING.md for more information

// replace github.com/wundergraph/graphql-go-tools/v2 => ../../graphql-go-tools/v2
// TODO: Update to released version once wundergraph/graphql-go-tools#1453 is merged and tagged.
replace github.com/wundergraph/graphql-go-tools/v2 => github.com/fengyuwusong/graphql-go-tools/v2 v2.0.0-20260319034538-12c891d918df
Comment thread
coderabbitai[bot] marked this conversation as resolved.
4 changes: 2 additions & 2 deletions router/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM=
github.com/fatih/color v1.18.0/go.mod h1:4FelSpRwEGDpQ12mAdzqdOukCy4u8WUtOY6lkT/6HfU=
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/fengyuwusong/graphql-go-tools/v2 v2.0.0-20260319034538-12c891d918df h1:4wW9/8mELQS2qGPaz4br5bjZwxtm+OLK1T4AEWZi9Xo=
github.com/fengyuwusong/graphql-go-tools/v2 v2.0.0-20260319034538-12c891d918df/go.mod h1:HjTAO/cuICpu31IfHY9qmSPygx6Gza7Wt9hTSReTI+A=
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/go-chi/chi/v5 v5.2.2 h1:CMwsvRVTbXVytCk1Wd72Zy1LAsAh9GxMmSNWLHCG618=
Expand Down Expand Up @@ -329,8 +331,6 @@ github.com/wundergraph/astjson v1.1.0 h1:xORDosrZ87zQFJwNGe/HIHXqzpdHOFmqWgykCLV
github.com/wundergraph/astjson v1.1.0/go.mod h1:h12D/dxxnedtLzsKyBLK7/Oe4TAoGpRVC9nDpDrZSWw=
github.com/wundergraph/go-arena v1.1.0 h1:9+wSRkJAkA2vbYHp6s8tEGhPViRGQNGXqPHT0QzhdIc=
github.com/wundergraph/go-arena v1.1.0/go.mod h1:ROOysEHWJjLQ8FSfNxZCziagb7Qw2nXY3/vgKRh7eWw=
github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.265 h1:KVmojt3oH13VX8Yr8NZ+fuOiruLyznderHITJs1MyWE=
github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.265/go.mod h1:HjTAO/cuICpu31IfHY9qmSPygx6Gza7Wt9hTSReTI+A=
github.com/yosida95/uritemplate/v3 v3.0.2 h1:Ed3Oyj9yrmi9087+NczuL5BwkIc4wvTb5zIM+UJPGz4=
github.com/yosida95/uritemplate/v3 v3.0.2/go.mod h1:ILOh0sOhIJR3+L/8afwt/kE++YT040gmv5BQTMR2HP4=
github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M=
Expand Down
20 changes: 20 additions & 0 deletions router/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1114,6 +1114,24 @@ type MCPServer struct {
BaseURL string `yaml:"base_url,omitempty" env:"MCP_SERVER_BASE_URL"`
}

// GRPCProtocolConfig configures the transport protocol for gRPC subgraphs.
// By default all gRPC subgraphs use native gRPC (HTTP/2 + Protobuf).
// Setting the protocol to "connectrpc" switches a subgraph to the Connect protocol (HTTP/1.1).
type GRPCProtocolConfig struct {
// Default protocol for all gRPC subgraphs ("grpc" or "connectrpc").
Default string `yaml:"default,omitempty" json:"default,omitempty"`
// DefaultEncoding for Connect subgraphs ("proto" or "json").
DefaultEncoding string `yaml:"default_encoding,omitempty" json:"default_encoding,omitempty"`
// Per-subgraph protocol and encoding overrides.
Subgraphs map[string]SubgraphGRPCProtocolConfig `yaml:"subgraphs,omitempty" json:"subgraphs,omitempty"`
}

// SubgraphGRPCProtocolConfig holds per-subgraph protocol/encoding settings.
type SubgraphGRPCProtocolConfig struct {
Protocol string `yaml:"protocol,omitempty" json:"protocol,omitempty"`
Encoding string `yaml:"encoding,omitempty" json:"encoding,omitempty"`
}

type ConnectRPCConfiguration struct {
Enabled bool `yaml:"enabled" envDefault:"false" env:"CONNECT_RPC_ENABLED"`
Server ConnectRPCServer `yaml:"server,omitempty" envPrefix:"CONNECT_RPC_SERVER_"`
Expand Down Expand Up @@ -1222,6 +1240,8 @@ type Config struct {
Plugins PluginsConfiguration `yaml:"plugins" envPrefix:"PLUGINS_"`

WatchConfig WatchConfig `yaml:"watch_config" envPrefix:"WATCH_CONFIG_"`

GRPCProtocol *GRPCProtocolConfig `yaml:"grpc_protocol,omitempty"`
}

type WatchConfig struct {
Expand Down
65 changes: 65 additions & 0 deletions router/pkg/grpcprotocol/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package grpcprotocol

import (
"fmt"

"github.com/wundergraph/cosmo/router/pkg/config"
)

const (
ProtocolGRPC = "grpc"
ProtocolConnectRPC = "connectrpc"

EncodingProto = "proto"
EncodingJSON = "json"
)

// Validate checks that all config values in a GRPCProtocolConfig are valid.
func Validate(cfg *config.GRPCProtocolConfig) error {
if cfg == nil {
return nil
}
if cfg.Default != "" && cfg.Default != ProtocolGRPC && cfg.Default != ProtocolConnectRPC {
return fmt.Errorf("grpc_protocol.default: invalid value %q, must be %q or %q", cfg.Default, ProtocolGRPC, ProtocolConnectRPC)
}
if cfg.DefaultEncoding != "" && cfg.DefaultEncoding != EncodingProto && cfg.DefaultEncoding != EncodingJSON {
return fmt.Errorf("grpc_protocol.default_encoding: invalid value %q, must be %q or %q", cfg.DefaultEncoding, EncodingProto, EncodingJSON)
}
for name, sg := range cfg.Subgraphs {
if sg.Protocol != "" && sg.Protocol != ProtocolGRPC && sg.Protocol != ProtocolConnectRPC {
return fmt.Errorf("grpc_protocol.subgraphs.%s.protocol: invalid value %q", name, sg.Protocol)
}
if sg.Encoding != "" && sg.Encoding != EncodingProto && sg.Encoding != EncodingJSON {
return fmt.Errorf("grpc_protocol.subgraphs.%s.encoding: invalid value %q", name, sg.Encoding)
}
}
return nil
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

// ResolveProtocol returns the effective protocol for a subgraph.
func ResolveProtocol(cfg *config.GRPCProtocolConfig, subgraphName string) string {
if cfg == nil {
return ProtocolGRPC
}
if sg, ok := cfg.Subgraphs[subgraphName]; ok && sg.Protocol != "" {
return sg.Protocol
}
if cfg.Default != "" {
return cfg.Default
}
return ProtocolGRPC
}

// ResolveEncoding returns the effective encoding for a subgraph.
func ResolveEncoding(cfg *config.GRPCProtocolConfig, subgraphName string) string {
if cfg == nil {
return EncodingProto
}
if sg, ok := cfg.Subgraphs[subgraphName]; ok && sg.Encoding != "" {
return sg.Encoding
}
if cfg.DefaultEncoding != "" {
return cfg.DefaultEncoding
}
return EncodingProto
}
Loading
Loading