Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 29 additions & 4 deletions v2/pkg/engine/datasource/graphql_datasource/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,18 @@ type ConfigurationInput struct {
SchemaConfiguration *SchemaConfiguration
CustomScalarTypeFields []SingleTypeField

GRPC *grpcdatasource.GRPCConfiguration
GRPC *grpcdatasource.GRPCConfiguration
Connect *ConnectConfiguration
}

// ConnectConfiguration holds Connect protocol-specific configuration.
// The GRPC field in ConfigurationInput is still used for Mapping and Compiler,
// as Connect shares the same protobuf schema definitions.
type ConnectConfiguration struct {
// BaseURL is the base URL of the Connect service (e.g., "http://localhost:8080").
BaseURL string
// Encoding specifies the serialization format (Protobuf or JSON).
Encoding grpcdatasource.ConnectEncoding
}

type Configuration struct {
Expand All @@ -29,7 +40,8 @@ type Configuration struct {
schemaConfiguration SchemaConfiguration
customScalarTypeFields []SingleTypeField

grpc *grpcdatasource.GRPCConfiguration
grpc *grpcdatasource.GRPCConfiguration
connect *ConnectConfiguration
}

func NewConfiguration(input ConfigurationInput) (Configuration, error) {
Expand All @@ -46,8 +58,8 @@ func NewConfiguration(input ConfigurationInput) (Configuration, error) {

cfg.schemaConfiguration = *input.SchemaConfiguration

if input.Fetch == nil && input.Subscription == nil && input.GRPC == nil {
return Configuration{}, errors.New("fetch or subscription or grpc configuration is required")
if input.Fetch == nil && input.Subscription == nil && input.GRPC == nil && input.Connect == nil {
return Configuration{}, errors.New("fetch or subscription or grpc or connect configuration is required")
}

if input.Fetch != nil {
Expand Down Expand Up @@ -76,6 +88,15 @@ func NewConfiguration(input ConfigurationInput) (Configuration, error) {
cfg.grpc = input.GRPC
}

if input.Connect != nil {
cfg.connect = input.Connect
// Connect uses the same GRPC mapping/compiler for proto schema definitions.
// Ensure GRPC config is also provided when using Connect.
if input.GRPC == nil {
return Configuration{}, errors.New("GRPC configuration (mapping/compiler) is required when using Connect")
}
}

return cfg, nil
}

Expand All @@ -99,6 +120,10 @@ func (c *Configuration) IsGRPC() bool {
return c.grpc != nil
}

func (c *Configuration) IsConnect() bool {
return c.connect != nil
}

type SingleTypeField struct {
TypeName string
FieldName string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,9 @@ type Planner[T Configuration] struct {

minifier *astminify.Minifier

// gRPC
grpcClient grpc.ClientConnInterface
// gRPC / Connect
grpcClient grpc.ClientConnInterface
connectTransport grpcdatasource.RPCTransport

printKitPool *sync.Pool
}
Expand Down Expand Up @@ -360,7 +361,15 @@ func (p *Planner[T]) ConfigureFetch() resolve.FetchConfiguration {
return resolve.FetchConfiguration{}
}

dataSource, err = grpcdatasource.NewDataSource(p.grpcClient, grpcdatasource.DataSourceConfig{
// Determine which transport to use: Connect or gRPC.
var transport grpcdatasource.RPCTransport
if p.connectTransport != nil {
transport = p.connectTransport
} else {
transport = grpcdatasource.NewGRPCTransport(p.grpcClient)
}

dataSource, err = grpcdatasource.NewDataSource(transport, grpcdatasource.DataSourceConfig{
Operation: &opDocument,
Definition: p.config.schemaConfiguration.upstreamSchemaAst,
Mapping: p.config.grpc.Mapping,
Expand All @@ -372,7 +381,7 @@ func (p *Planner[T]) ConfigureFetch() resolve.FetchConfiguration {
SubgraphName: p.dataSourceConfig.Name(),
})
if err != nil {
p.stopWithError(errors.WithStack(fmt.Errorf("failed to create gRPC datasource: %w", err)))
p.stopWithError(errors.WithStack(fmt.Errorf("failed to create datasource: %w", err)))
return resolve.FetchConfiguration{}
}
}
Expand Down Expand Up @@ -1728,10 +1737,11 @@ func getRelaxedPrintKitPool() *sync.Pool {
}

type Factory[T Configuration] struct {
executionContext context.Context
executionContext context.Context
httpClient *http.Client
grpcClient grpc.ClientConnInterface
grpcClientProvider func() grpc.ClientConnInterface
connectTransport grpcdatasource.RPCTransport
subscriptionClient GraphQLSubscriptionClient
printKitPool *sync.Pool
}
Expand Down Expand Up @@ -1795,6 +1805,23 @@ func NewFactoryGRPCClientProvider(executionContext context.Context, clientProvid
}, nil
}

// NewFactoryConnect creates a Connect protocol factory for the GraphQL datasource planner.
// It uses the Connect protocol (HTTP-native RPC) instead of gRPC for transport.
func NewFactoryConnect(executionContext context.Context, connectTransport grpcdatasource.RPCTransport) (*Factory[Configuration], error) {
if executionContext == nil {
return nil, fmt.Errorf("execution context is required")
}

if connectTransport == nil {
return nil, fmt.Errorf("connect transport is required")
}

return &Factory[Configuration]{
executionContext: executionContext,
connectTransport: connectTransport,
}, nil
}

func (p *Planner[T]) getKit() *printKit {
pool := p.printKitPool
if pool == nil {
Expand Down Expand Up @@ -1837,6 +1864,7 @@ func (f *Factory[T]) Planner(logger abstractlogger.Logger) plan.DataSourcePlanne
return &Planner[T]{
fetchClient: f.httpClient,
grpcClient: grpcClient,
connectTransport: f.connectTransport,
subscriptionClient: f.subscriptionClient,
printKitPool: f.getPrintKitPool(),
}
Expand All @@ -1862,7 +1890,7 @@ func (f *Factory[T]) PlanningBehavior() plan.DataSourcePlanningBehavior {
MergeAliasedRootNodes: true,
OverrideFieldPathFromAlias: true,
AllowPlanningTypeName: true,
AlwaysFlattenFragments: f.grpcClient != nil || f.grpcClientProvider != nil,
AlwaysFlattenFragments: f.grpcClient != nil || f.grpcClientProvider != nil || f.connectTransport != nil,
}
return b
}
Expand Down
16 changes: 11 additions & 5 deletions v2/pkg/engine/datasource/grpc_datasource/grpc_datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ var _ resolve.DataSource = (*DataSource)(nil)
// transforms the responses back to GraphQL format.
type DataSource struct {
plan *RPCExecutionPlan
cc grpc.ClientConnInterface
transport RPCTransport
rc *RPCCompiler
mapping *GRPCMapping
federationConfigs plan.FederationFieldConfigurations
Expand All @@ -66,8 +66,8 @@ type DataSourceConfig struct {
Disabled bool
}

// NewDataSource creates a new gRPC datasource
func NewDataSource(client grpc.ClientConnInterface, config DataSourceConfig) (*DataSource, error) {
// NewDataSource creates a new datasource with the given RPCTransport.
func NewDataSource(transport RPCTransport, config DataSourceConfig) (*DataSource, error) {
planner, err := NewPlanner(config.SubgraphName, config.Mapping, config.FederationConfigs)
if err != nil {
return nil, err
Expand All @@ -79,7 +79,7 @@ func NewDataSource(client grpc.ClientConnInterface, config DataSourceConfig) (*D

return &DataSource{
plan: plan,
cc: client,
transport: transport,
rc: config.Compiler,
mapping: config.Mapping,
federationConfigs: config.FederationConfigs,
Expand All @@ -88,6 +88,12 @@ func NewDataSource(client grpc.ClientConnInterface, config DataSourceConfig) (*D
}, nil
}

// NewDataSourceGRPC creates a new gRPC datasource using a gRPC ClientConnInterface.
// This is a convenience function that wraps the connection in a grpcTransport.
func NewDataSourceGRPC(client grpc.ClientConnInterface, config DataSourceConfig) (*DataSource, error) {
return NewDataSource(NewGRPCTransport(client), config)
}

// Load implements resolve.DataSource interface.
// It processes the input JSON data to make gRPC calls and returns
// the response data.
Expand Down Expand Up @@ -149,7 +155,7 @@ func (d *DataSource) Load(ctx context.Context, headers http.Header, input []byte
errGrp.Go(func() error {
// Invoke the gRPC method - this will populate serviceCall.Output

err := d.cc.Invoke(errGrpCtx, serviceCall.MethodFullName(), serviceCall.Input, serviceCall.Output)
err := d.transport.Invoke(errGrpCtx, serviceCall.MethodFullName(), serviceCall.Input, serviceCall.Output)
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func Test_DataSource_Load_WithEntity_Calls(t *testing.T) {
}

// Create the datasource
ds, err := NewDataSource(conn, DataSourceConfig{
ds, err := NewDataSourceGRPC(conn, DataSourceConfig{
Operation: &queryDoc,
Definition: &schemaDoc,
SubgraphName: "Products",
Expand Down Expand Up @@ -478,7 +478,7 @@ func Test_DataSource_Load_WithEntity_Calls_WithCompositeTypes(t *testing.T) {
}

// Create the datasource
ds, err := NewDataSource(conn, DataSourceConfig{
ds, err := NewDataSourceGRPC(conn, DataSourceConfig{
Operation: &queryDoc,
Definition: &schemaDoc,
SubgraphName: "Products",
Expand Down Expand Up @@ -1267,7 +1267,7 @@ func Test_DataSource_Load_WithEntity_Calls_And_Requires(t *testing.T) {
}

// Create the datasource
ds, err := NewDataSource(conn, DataSourceConfig{
ds, err := NewDataSourceGRPC(conn, DataSourceConfig{
Operation: &queryDoc,
Definition: &schemaDoc,
SubgraphName: "Products",
Expand Down Expand Up @@ -1682,7 +1682,7 @@ func Test_DataSource_Load_WithEntity_Calls_And_Requires_And_FieldResolvers(t *te
}

// Create the datasource
ds, err := NewDataSource(conn, DataSourceConfig{
ds, err := NewDataSourceGRPC(conn, DataSourceConfig{
Operation: &queryDoc,
Definition: &schemaDoc,
SubgraphName: "Products",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func Test_DataSource_Load_NullMetrics_NestedResolversNotInvoked(t *testing.T) {
compiler, err := NewProtoCompiler(grpctest.MustProtoSchema(t), testMapping())
require.NoError(t, err)

ds, err := NewDataSource(conn, DataSourceConfig{
ds, err := NewDataSourceGRPC(conn, DataSourceConfig{
Operation: &queryDoc,
Definition: &schemaDoc,
SubgraphName: "Products",
Expand Down Expand Up @@ -191,7 +191,7 @@ func Test_DataSource_Load_NullCategory_FieldResolversNotInvoked(t *testing.T) {
compiler, err := NewProtoCompiler(grpctest.MustProtoSchema(t), testMapping())
require.NoError(t, err)

ds, err := NewDataSource(conn, DataSourceConfig{
ds, err := NewDataSourceGRPC(conn, DataSourceConfig{
Operation: &queryDoc,
Definition: &schemaDoc,
SubgraphName: "Products",
Expand Down Expand Up @@ -226,7 +226,7 @@ func Test_DataSource_Load_ArgumentLessFieldResolversCalled(t *testing.T) {
compiler, err := NewProtoCompiler(grpctest.MustProtoSchema(t), testMapping())
require.NoError(t, err)

ds, err := NewDataSource(conn, DataSourceConfig{
ds, err := NewDataSource(NewGRPCTransport(conn), DataSourceConfig{
Operation: &queryDoc,
Definition: &schemaDoc,
SubgraphName: "Products",
Expand Down Expand Up @@ -267,7 +267,7 @@ func Test_DataSource_Load_NullCategory_ArgumentLessFieldResolversNotInvoked(t *t
compiler, err := NewProtoCompiler(grpctest.MustProtoSchema(t), testMapping())
require.NoError(t, err)

ds, err := NewDataSource(conn, DataSourceConfig{
ds, err := NewDataSource(NewGRPCTransport(conn), DataSourceConfig{
Operation: &queryDoc,
Definition: &schemaDoc,
SubgraphName: "Products",
Expand Down
Loading